Reactor Netty: emit actual HTTP client spans spans on connection errors (#9063)

This commit is contained in:
Mateusz Rzeszutek 2023-08-02 19:10:56 +02:00 committed by GitHub
parent 6e726dc188
commit 527c4b39e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 339 additions and 79 deletions

View File

@ -34,22 +34,22 @@ public final class NettyClientInstrumenterFactory {
private final OpenTelemetry openTelemetry; private final OpenTelemetry openTelemetry;
private final String instrumentationName; private final String instrumentationName;
private final boolean connectionTelemetryEnabled; private final NettyConnectionInstrumentationFlag connectionTelemetryState;
private final boolean sslTelemetryEnabled; private final NettyConnectionInstrumentationFlag sslTelemetryState;
private final Map<String, String> peerServiceMapping; private final Map<String, String> peerServiceMapping;
private final boolean emitExperimentalHttpClientMetrics; private final boolean emitExperimentalHttpClientMetrics;
public NettyClientInstrumenterFactory( public NettyClientInstrumenterFactory(
OpenTelemetry openTelemetry, OpenTelemetry openTelemetry,
String instrumentationName, String instrumentationName,
boolean connectionTelemetryEnabled, NettyConnectionInstrumentationFlag connectionTelemetryState,
boolean sslTelemetryEnabled, NettyConnectionInstrumentationFlag sslTelemetryState,
Map<String, String> peerServiceMapping, Map<String, String> peerServiceMapping,
boolean emitExperimentalHttpClientMetrics) { boolean emitExperimentalHttpClientMetrics) {
this.openTelemetry = openTelemetry; this.openTelemetry = openTelemetry;
this.instrumentationName = instrumentationName; this.instrumentationName = instrumentationName;
this.connectionTelemetryEnabled = connectionTelemetryEnabled; this.connectionTelemetryState = connectionTelemetryState;
this.sslTelemetryEnabled = sslTelemetryEnabled; this.sslTelemetryState = sslTelemetryState;
this.peerServiceMapping = peerServiceMapping; this.peerServiceMapping = peerServiceMapping;
this.emitExperimentalHttpClientMetrics = emitExperimentalHttpClientMetrics; this.emitExperimentalHttpClientMetrics = emitExperimentalHttpClientMetrics;
} }
@ -83,49 +83,38 @@ public final class NettyClientInstrumenterFactory {
} }
public NettyConnectionInstrumenter createConnectionInstrumenter() { public NettyConnectionInstrumenter createConnectionInstrumenter() {
InstrumenterBuilder<NettyConnectionRequest, Channel> instrumenterBuilder = if (connectionTelemetryState == NettyConnectionInstrumentationFlag.DISABLED) {
return NoopConnectionInstrumenter.INSTANCE;
}
boolean connectionTelemetryFullyEnabled =
connectionTelemetryState == NettyConnectionInstrumentationFlag.ENABLED;
Instrumenter<NettyConnectionRequest, Channel> instrumenter =
Instrumenter.<NettyConnectionRequest, Channel>builder( Instrumenter.<NettyConnectionRequest, Channel>builder(
openTelemetry, instrumentationName, NettyConnectionRequest::spanName) openTelemetry, instrumentationName, NettyConnectionRequest::spanName)
.addAttributesExtractor( .addAttributesExtractor(
PeerServiceAttributesExtractor.create( PeerServiceAttributesExtractor.create(
NettyConnectHttpAttributesGetter.INSTANCE, peerServiceMapping)); NettyConnectHttpAttributesGetter.INSTANCE, peerServiceMapping))
.addAttributesExtractor(
HttpClientAttributesExtractor.create(NettyConnectHttpAttributesGetter.INSTANCE))
.buildInstrumenter(
connectionTelemetryFullyEnabled
? SpanKindExtractor.alwaysInternal()
: SpanKindExtractor.alwaysClient());
if (connectionTelemetryEnabled) { return connectionTelemetryFullyEnabled
// TODO: this will most likely be no longer true with the new semconv, since the connection
// phase happens *before* the actual HTTP request is sent over the wire
// TODO (mateusz): refactor this after reactor-netty is fully converted to low-level HTTP
// instrumentation
// when the connection telemetry is enabled, we do not want these CONNECT spans to be
// suppressed by higher-level HTTP spans - this would happen in the reactor-netty
// instrumentation
// the solution for this is to deliberately avoid using the HTTP extractor and use the plain
// net attributes extractor instead
instrumenterBuilder.addAttributesExtractor(
NetClientAttributesExtractor.create(NettyConnectHttpAttributesGetter.INSTANCE));
} else {
// when the connection telemetry is not enabled, netty creates CONNECT spans whenever a
// connection error occurs - because there is no HTTP span in that scenario, if raw netty
// connection occurs before an HTTP message is even formed
// we don't want that span when a higher-level HTTP library (like reactor-netty or async http
// client) is used, the connection phase is a part of the HTTP span for these
// for that to happen, the CONNECT span will "pretend" to be a full HTTP span when connection
// telemetry is off
instrumenterBuilder.addAttributesExtractor(
HttpClientAttributesExtractor.create(NettyConnectHttpAttributesGetter.INSTANCE));
}
Instrumenter<NettyConnectionRequest, Channel> instrumenter =
instrumenterBuilder.buildInstrumenter(
connectionTelemetryEnabled
? SpanKindExtractor.alwaysInternal()
: SpanKindExtractor.alwaysClient());
return connectionTelemetryEnabled
? new NettyConnectionInstrumenterImpl(instrumenter) ? new NettyConnectionInstrumenterImpl(instrumenter)
: new NettyErrorOnlyConnectionInstrumenter(instrumenter); : new NettyErrorOnlyConnectionInstrumenter(instrumenter);
} }
public NettySslInstrumenter createSslInstrumenter() { public NettySslInstrumenter createSslInstrumenter() {
if (sslTelemetryState == NettyConnectionInstrumentationFlag.DISABLED) {
return NoopSslInstrumenter.INSTANCE;
}
boolean sslTelemetryFullyEnabled =
sslTelemetryState == NettyConnectionInstrumentationFlag.ENABLED;
NettySslNetAttributesGetter netAttributesGetter = new NettySslNetAttributesGetter(); NettySslNetAttributesGetter netAttributesGetter = new NettySslNetAttributesGetter();
Instrumenter<NettySslRequest, Void> instrumenter = Instrumenter<NettySslRequest, Void> instrumenter =
Instrumenter.<NettySslRequest, Void>builder( Instrumenter.<NettySslRequest, Void>builder(
@ -134,11 +123,11 @@ public final class NettyClientInstrumenterFactory {
.addAttributesExtractor( .addAttributesExtractor(
PeerServiceAttributesExtractor.create(netAttributesGetter, peerServiceMapping)) PeerServiceAttributesExtractor.create(netAttributesGetter, peerServiceMapping))
.buildInstrumenter( .buildInstrumenter(
sslTelemetryEnabled sslTelemetryFullyEnabled
? SpanKindExtractor.alwaysInternal() ? SpanKindExtractor.alwaysInternal()
: SpanKindExtractor.alwaysClient()); : SpanKindExtractor.alwaysClient());
return sslTelemetryEnabled return sslTelemetryFullyEnabled
? new NettySslInstrumenterImpl(instrumenter) ? new NettySslInstrumenterImpl(instrumenter)
: new NettySslErrorOnlyInstrumenter(instrumenter); : new NettySslErrorOnlyInstrumenter(instrumenter);
} }

View File

@ -0,0 +1,20 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.netty.v4.common.internal.client;
/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public enum NettyConnectionInstrumentationFlag {
ENABLED,
ERROR_ONLY,
DISABLED;
public static NettyConnectionInstrumentationFlag enabledOrErrorOnly(boolean b) {
return b ? ENABLED : ERROR_ONLY;
}
}

View File

@ -0,0 +1,34 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.netty.v4.common.internal.client;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.netty.channel.Channel;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.netty.common.internal.NettyConnectionRequest;
import javax.annotation.Nullable;
enum NoopConnectionInstrumenter implements NettyConnectionInstrumenter {
INSTANCE;
@Override
public boolean shouldStart(Context parentContext, NettyConnectionRequest request) {
return false;
}
@CanIgnoreReturnValue
@Override
public Context start(Context parentContext, NettyConnectionRequest request) {
return parentContext;
}
@Override
public void end(
Context context,
NettyConnectionRequest request,
Channel channel,
@Nullable Throwable error) {}
}

View File

@ -0,0 +1,28 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.netty.v4.common.internal.client;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.context.Context;
import javax.annotation.Nullable;
enum NoopSslInstrumenter implements NettySslInstrumenter {
INSTANCE;
@Override
public boolean shouldStart(Context parentContext, NettySslRequest request) {
return false;
}
@CanIgnoreReturnValue
@Override
public Context start(Context parentContext, NettySslRequest request) {
return parentContext;
}
@Override
public void end(Context context, NettySslRequest request, @Nullable Throwable error) {}
}

View File

@ -5,6 +5,8 @@
package io.opentelemetry.javaagent.instrumentation.netty.v4_0.client; package io.opentelemetry.javaagent.instrumentation.netty.v4_0.client;
import static io.opentelemetry.instrumentation.netty.v4.common.internal.client.NettyConnectionInstrumentationFlag.enabledOrErrorOnly;
import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponse;
import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
@ -43,8 +45,8 @@ public final class NettyClientSingletons {
new NettyClientInstrumenterFactory( new NettyClientInstrumenterFactory(
GlobalOpenTelemetry.get(), GlobalOpenTelemetry.get(),
"io.opentelemetry.netty-4.0", "io.opentelemetry.netty-4.0",
connectionTelemetryEnabled, enabledOrErrorOnly(connectionTelemetryEnabled),
sslTelemetryEnabled, enabledOrErrorOnly(sslTelemetryEnabled),
CommonConfig.get().getPeerServiceMapping(), CommonConfig.get().getPeerServiceMapping(),
CommonConfig.get().shouldEmitExperimentalHttpClientMetrics()); CommonConfig.get().shouldEmitExperimentalHttpClientMetrics());
INSTRUMENTER = INSTRUMENTER =

View File

@ -5,6 +5,8 @@
package io.opentelemetry.javaagent.instrumentation.netty.v4_1; package io.opentelemetry.javaagent.instrumentation.netty.v4_1;
import static io.opentelemetry.instrumentation.netty.v4.common.internal.client.NettyConnectionInstrumentationFlag.enabledOrErrorOnly;
import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponse;
import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
@ -43,8 +45,8 @@ public final class NettyClientSingletons {
new NettyClientInstrumenterFactory( new NettyClientInstrumenterFactory(
GlobalOpenTelemetry.get(), GlobalOpenTelemetry.get(),
"io.opentelemetry.netty-4.1", "io.opentelemetry.netty-4.1",
connectionTelemetryEnabled, enabledOrErrorOnly(connectionTelemetryEnabled),
sslTelemetryEnabled, enabledOrErrorOnly(sslTelemetryEnabled),
CommonConfig.get().getPeerServiceMapping(), CommonConfig.get().getPeerServiceMapping(),
CommonConfig.get().shouldEmitExperimentalHttpClientMetrics()); CommonConfig.get().shouldEmitExperimentalHttpClientMetrics());
INSTRUMENTER = INSTRUMENTER =

View File

@ -12,6 +12,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesExtractorBuilder; import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesExtractorBuilder;
import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel; import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
import io.opentelemetry.instrumentation.netty.v4.common.internal.client.NettyClientInstrumenterFactory; import io.opentelemetry.instrumentation.netty.v4.common.internal.client.NettyClientInstrumenterFactory;
import io.opentelemetry.instrumentation.netty.v4.common.internal.client.NettyConnectionInstrumentationFlag;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -111,8 +112,8 @@ public final class NettyClientTelemetryBuilder {
new NettyClientInstrumenterFactory( new NettyClientInstrumenterFactory(
openTelemetry, openTelemetry,
"io.opentelemetry.netty-4.1", "io.opentelemetry.netty-4.1",
false, NettyConnectionInstrumentationFlag.DISABLED,
false, NettyConnectionInstrumentationFlag.DISABLED,
Collections.emptyMap(), Collections.emptyMap(),
emitExperimentalHttpClientMetrics) emitExperimentalHttpClientMetrics)
.createHttpInstrumenter(extractorConfigurer, additionalAttributesExtractors)); .createHttpInstrumenter(extractorConfigurer, additionalAttributesExtractors));

View File

@ -4,4 +4,5 @@ plugins {
dependencies { dependencies {
testImplementation(project(":instrumentation:reactor:reactor-netty:reactor-netty-1.0:javaagent")) testImplementation(project(":instrumentation:reactor:reactor-netty:reactor-netty-1.0:javaagent"))
testImplementation("io.projectreactor.netty:reactor-netty-http:1.0.0")
} }

View File

@ -0,0 +1,92 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;
import java.net.InetSocketAddress;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import reactor.netty.http.client.HttpClientConfig;
import reactor.netty.http.client.HttpClientRequest;
@ExtendWith(MockitoExtension.class)
class FailedRequestWithUrlMakerTest {
@Mock HttpClientConfig config;
@Mock HttpClientRequest originalRequest;
@Test
void shouldUseAbsoluteUri() {
when(config.uri()).thenReturn("https://opentelemetry.io");
HttpClientRequest request = FailedRequestWithUrlMaker.create(config, originalRequest);
assertThat(request.resourceUrl()).isEqualTo("https://opentelemetry.io");
}
@ParameterizedTest
@ValueSource(strings = {"https://opentelemetry.io", "https://opentelemetry.io/"})
void shouldPrependBaseUrl(String baseUrl) {
when(config.baseUrl()).thenReturn(baseUrl);
when(config.uri()).thenReturn("/docs");
HttpClientRequest request = FailedRequestWithUrlMaker.create(config, originalRequest);
assertThat(request.resourceUrl()).isEqualTo("https://opentelemetry.io/docs");
}
@Test
void shouldPrependRemoteAddress() {
when(config.baseUrl()).thenReturn("/");
when(config.uri()).thenReturn("/docs");
Supplier<InetSocketAddress> remoteAddress =
() -> InetSocketAddress.createUnresolved("opentelemetry.io", 8080);
doReturn(remoteAddress).when(config).remoteAddress();
when(config.isSecure()).thenReturn(true);
HttpClientRequest request = FailedRequestWithUrlMaker.create(config, originalRequest);
assertThat(request.resourceUrl()).isEqualTo("https://opentelemetry.io:8080/docs");
}
@ParameterizedTest
@ArgumentsSource(DefaultPortsArguments.class)
void shouldSkipDefaultPorts(int port, boolean isSecure) {
when(config.baseUrl()).thenReturn("/");
when(config.uri()).thenReturn("/docs");
Supplier<InetSocketAddress> remoteAddress =
() -> InetSocketAddress.createUnresolved("opentelemetry.io", port);
doReturn(remoteAddress).when(config).remoteAddress();
when(config.isSecure()).thenReturn(isSecure);
HttpClientRequest request = FailedRequestWithUrlMaker.create(config, originalRequest);
assertThat(request.resourceUrl())
.isEqualTo((isSecure ? "https" : "http") + "://opentelemetry.io/docs");
}
static final class DefaultPortsArguments implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext extensionContext) {
return Stream.of(arguments(80, false), arguments(443, true));
}
}
}

View File

@ -0,0 +1,90 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import reactor.netty.http.client.HttpClientConfig;
import reactor.netty.http.client.HttpClientRequest;
final class FailedRequestWithUrlMaker {
static HttpClientRequest create(HttpClientConfig config, HttpClientRequest failedRequest) {
return (HttpClientRequest)
Proxy.newProxyInstance(
FailedRequestWithUrlMaker.class.getClassLoader(),
new Class<?>[] {HttpClientRequest.class},
new HttpRequestInvocationHandler(config, failedRequest));
}
private static final class HttpRequestInvocationHandler implements InvocationHandler {
private final HttpClientConfig config;
private final HttpClientRequest failedRequest;
private HttpRequestInvocationHandler(HttpClientConfig config, HttpClientRequest failedRequest) {
this.config = config;
this.failedRequest = failedRequest;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if ("resourceUrl".equals(method.getName())) {
return computeUrlFromConfig();
}
try {
return method.invoke(failedRequest, args);
} catch (InvocationTargetException exception) {
throw exception.getCause();
}
}
private String computeUrlFromConfig() {
String uri = config.uri();
if (isAbsolute(uri)) {
return uri;
}
// use the baseUrl if it was configured
String baseUrl = config.baseUrl();
// baseUrl is an actual scheme+host+port base url, and not just "/"
if (baseUrl != null && baseUrl.length() > 1) {
if (baseUrl.endsWith("/")) {
baseUrl = baseUrl.substring(0, baseUrl.length() - 1);
}
return baseUrl + uri;
}
// otherwise, use the host+port config to construct the full url
SocketAddress hostAddress = config.remoteAddress().get();
if (hostAddress instanceof InetSocketAddress) {
InetSocketAddress inetHostAddress = (InetSocketAddress) hostAddress;
return (config.isSecure() ? "https://" : "http://")
+ inetHostAddress.getHostString()
+ computePortPart(inetHostAddress.getPort())
+ uri;
}
return uri;
}
private static boolean isAbsolute(String uri) {
return uri != null && !uri.isEmpty() && !uri.startsWith("/");
}
private String computePortPart(int port) {
boolean defaultPortValue =
(config.isSecure() && port == 443) || (!config.isSecure() && port == 80);
return defaultPortValue ? "" : (":" + port);
}
}
private FailedRequestWithUrlMaker() {}
}

View File

@ -16,6 +16,7 @@ import javax.annotation.Nullable;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.netty.Connection; import reactor.netty.Connection;
import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientConfig;
import reactor.netty.http.client.HttpClientRequest; import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse; import reactor.netty.http.client.HttpClientResponse;
@ -31,13 +32,14 @@ public final class HttpResponseReceiverInstrumenter {
// implements ResponseReceiver // implements ResponseReceiver
if (receiver instanceof HttpClient) { if (receiver instanceof HttpClient) {
HttpClient client = (HttpClient) receiver; HttpClient client = (HttpClient) receiver;
HttpClientConfig config = client.configuration();
InstrumentationContexts instrumentationContexts = new InstrumentationContexts(); InstrumentationContexts instrumentationContexts = new InstrumentationContexts();
HttpClient modified = HttpClient modified =
client client
.mapConnect(new CaptureParentContext(instrumentationContexts)) .mapConnect(new CaptureParentContext(instrumentationContexts))
.doOnRequestError(new EndOperationWithRequestError(instrumentationContexts)) .doOnRequestError(new EndOperationWithRequestError(config, instrumentationContexts))
.doOnRequest(new StartOperation(instrumentationContexts)) .doOnRequest(new StartOperation(instrumentationContexts))
.doOnResponseError(new EndOperationWithResponseError(instrumentationContexts)) .doOnResponseError(new EndOperationWithResponseError(instrumentationContexts))
.doAfterResponseSuccess(new EndOperationWithSuccess(instrumentationContexts)) .doAfterResponseSuccess(new EndOperationWithSuccess(instrumentationContexts))
@ -103,9 +105,12 @@ public final class HttpResponseReceiverInstrumenter {
private static final class EndOperationWithRequestError private static final class EndOperationWithRequestError
implements BiConsumer<HttpClientRequest, Throwable> { implements BiConsumer<HttpClientRequest, Throwable> {
private final HttpClientConfig config;
private final InstrumentationContexts instrumentationContexts; private final InstrumentationContexts instrumentationContexts;
EndOperationWithRequestError(InstrumentationContexts instrumentationContexts) { EndOperationWithRequestError(
HttpClientConfig config, InstrumentationContexts instrumentationContexts) {
this.config = config;
this.instrumentationContexts = instrumentationContexts; this.instrumentationContexts = instrumentationContexts;
} }
@ -114,15 +119,10 @@ public final class HttpResponseReceiverInstrumenter {
instrumentationContexts.endClientSpan(null, error); instrumentationContexts.endClientSpan(null, error);
if (HttpClientResend.get(instrumentationContexts.getParentContext()) == 0) { if (HttpClientResend.get(instrumentationContexts.getParentContext()) == 0) {
// TODO: emit connection error span // request is an instance of FailedHttpClientRequest, which does not implement a correct
// resourceUrl() method -- we have to work around that
// FIXME: this branch requires lots of changes around the NettyConnectionInstrumenter request = FailedRequestWithUrlMaker.create(config, request);
// currently it also creates that connection error span (when the connection telemetry is instrumentationContexts.startAndEndConnectionErrorSpan(request, error);
// turned off), but without HTTP semantics - it does not have access to any HTTP information
// after all
// it should be possible to completely disable it, and just start and end the span here
// this requires lots of refactoring and pretty uninteresting changes in the netty code, so
// I'll do that in a separate PR - for better readability
} }
} }
} }

View File

@ -9,6 +9,8 @@ import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.React
import io.opentelemetry.context.Context; import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientResend; import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientResend;
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
import io.opentelemetry.instrumentation.api.internal.Timer;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -18,6 +20,7 @@ import reactor.netty.http.client.HttpClientResponse;
final class InstrumentationContexts { final class InstrumentationContexts {
private volatile Context parentContext; private volatile Context parentContext;
private volatile Timer timer;
// on retries, reactor-netty starts the next resend attempt before it ends the previous one (i.e. // on retries, reactor-netty starts the next resend attempt before it ends the previous one (i.e.
// it calls the callback functions in that order); thus for a short moment there can be multiple // it calls the callback functions in that order); thus for a short moment there can be multiple
// coexisting HTTP client spans // coexisting HTTP client spans
@ -25,6 +28,7 @@ final class InstrumentationContexts {
void initialize(Context parentContext) { void initialize(Context parentContext) {
this.parentContext = HttpClientResend.initialize(parentContext); this.parentContext = HttpClientResend.initialize(parentContext);
timer = Timer.start();
} }
Context getParentContext() { Context getParentContext() {
@ -55,6 +59,15 @@ final class InstrumentationContexts {
} }
} }
void startAndEndConnectionErrorSpan(HttpClientRequest request, Throwable error) {
Context parentContext = this.parentContext;
if (instrumenter().shouldStart(parentContext, request)) {
Timer timer = this.timer;
InstrumenterUtil.startAndEnd(
instrumenter(), parentContext, request, null, error, timer.startTime(), timer.now());
}
}
static final class RequestAndContext { static final class RequestAndContext {
final HttpClientRequest request; final HttpClientRequest request;
final Context context; final Context context;

View File

@ -15,6 +15,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.http.HttpSpanNameExtrac
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpSpanStatusExtractor; import io.opentelemetry.instrumentation.api.instrumenter.http.HttpSpanStatusExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.net.PeerServiceAttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.net.PeerServiceAttributesExtractor;
import io.opentelemetry.instrumentation.netty.v4.common.internal.client.NettyClientInstrumenterFactory; import io.opentelemetry.instrumentation.netty.v4.common.internal.client.NettyClientInstrumenterFactory;
import io.opentelemetry.instrumentation.netty.v4.common.internal.client.NettyConnectionInstrumentationFlag;
import io.opentelemetry.instrumentation.netty.v4.common.internal.client.NettyConnectionInstrumenter; import io.opentelemetry.instrumentation.netty.v4.common.internal.client.NettyConnectionInstrumenter;
import io.opentelemetry.javaagent.bootstrap.internal.CommonConfig; import io.opentelemetry.javaagent.bootstrap.internal.CommonConfig;
import io.opentelemetry.javaagent.bootstrap.internal.DeprecatedConfigProperties; import io.opentelemetry.javaagent.bootstrap.internal.DeprecatedConfigProperties;
@ -70,8 +71,10 @@ public final class ReactorNettySingletons {
new NettyClientInstrumenterFactory( new NettyClientInstrumenterFactory(
GlobalOpenTelemetry.get(), GlobalOpenTelemetry.get(),
INSTRUMENTATION_NAME, INSTRUMENTATION_NAME,
connectionTelemetryEnabled, connectionTelemetryEnabled
false, ? NettyConnectionInstrumentationFlag.ENABLED
: NettyConnectionInstrumentationFlag.DISABLED,
NettyConnectionInstrumentationFlag.DISABLED,
CommonConfig.get().getPeerServiceMapping(), CommonConfig.get().getPeerServiceMapping(),
CommonConfig.get().shouldEmitExperimentalHttpClientMetrics()); CommonConfig.get().shouldEmitExperimentalHttpClientMetrics());
CONNECTION_INSTRUMENTER = instrumenterFactory.createConnectionInstrumenter(); CONNECTION_INSTRUMENTER = instrumenterFactory.createConnectionInstrumenter();

View File

@ -10,7 +10,6 @@ import static io.opentelemetry.api.trace.SpanKind.INTERNAL;
import static io.opentelemetry.api.trace.SpanKind.SERVER; import static io.opentelemetry.api.trace.SpanKind.SERVER;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static java.util.Collections.emptySet;
import static org.assertj.core.api.Assertions.catchThrowable; import static org.assertj.core.api.Assertions.catchThrowable;
import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpMethod;
@ -124,29 +123,19 @@ abstract class AbstractReactorNettyHttpClientTest
return exception; return exception;
}); });
// TODO: see the comment in HttpResponseReceiverInstrumenter.EndOperationWithRequestError
optionsBuilder.setExpectedClientSpanNameMapper(
AbstractReactorNettyHttpClientTest::getExpectedClientSpanName);
optionsBuilder.setHttpAttributes(this::getHttpAttributes); optionsBuilder.setHttpAttributes(this::getHttpAttributes);
} }
private static String getExpectedClientSpanName(URI uri, String method) {
// unopened port or non routable address
if ("http://localhost:61/".equals(uri.toString())
|| "https://192.0.2.1/".equals(uri.toString())) {
return "CONNECT";
}
return HttpClientTestOptions.DEFAULT_EXPECTED_CLIENT_SPAN_NAME_MAPPER.apply(uri, method);
}
protected Set<AttributeKey<?>> getHttpAttributes(URI uri) { protected Set<AttributeKey<?>> getHttpAttributes(URI uri) {
Set<AttributeKey<?>> attributes = new HashSet<>(HttpClientTestOptions.DEFAULT_HTTP_ATTRIBUTES);
// unopened port or non routable address // unopened port or non routable address
if ("http://localhost:61/".equals(uri.toString()) if ("http://localhost:61/".equals(uri.toString())
|| "https://192.0.2.1/".equals(uri.toString())) { || "https://192.0.2.1/".equals(uri.toString())) {
return emptySet(); attributes.remove(SemanticAttributes.NET_PROTOCOL_NAME);
attributes.remove(SemanticAttributes.NET_PROTOCOL_VERSION);
} }
Set<AttributeKey<?>> attributes = new HashSet<>(HttpClientTestOptions.DEFAULT_HTTP_ATTRIBUTES);
if (uri.toString().contains("/read-timeout")) { if (uri.toString().contains("/read-timeout")) {
attributes.remove(SemanticAttributes.NET_PROTOCOL_NAME); attributes.remove(SemanticAttributes.NET_PROTOCOL_NAME);
attributes.remove(SemanticAttributes.NET_PROTOCOL_VERSION); attributes.remove(SemanticAttributes.NET_PROTOCOL_VERSION);

View File

@ -81,7 +81,6 @@ class ReactorNettyClientSslTest {
.hasNoParent() .hasNoParent()
.hasStatus(StatusData.error()) .hasStatus(StatusData.error())
.hasException(thrown), .hasException(thrown),
/* FIXME: this span will be brought back in the next PR, when connection error spans are reintroduced
span -> span ->
span.hasName("GET") span.hasName("GET")
.hasKind(CLIENT) .hasKind(CLIENT)
@ -95,7 +94,6 @@ class ReactorNettyClientSslTest {
equalTo(SemanticAttributes.HTTP_URL, uri), equalTo(SemanticAttributes.HTTP_URL, uri),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_PEER_PORT, server.httpsPort())), equalTo(SemanticAttributes.NET_PEER_PORT, server.httpsPort())),
*/
span -> span ->
span.hasName("RESOLVE") span.hasName("RESOLVE")
.hasKind(INTERNAL) .hasKind(INTERNAL)

View File

@ -148,7 +148,6 @@ class ReactorNettyConnectionSpanTest {
.hasNoParent() .hasNoParent()
.hasStatus(StatusData.error()) .hasStatus(StatusData.error())
.hasException(thrown), .hasException(thrown),
/* FIXME: this span will be brought back in the next PR, when connection error spans are reintroduced
span -> span ->
span.hasName("GET") span.hasName("GET")
.hasKind(CLIENT) .hasKind(CLIENT)
@ -160,7 +159,6 @@ class ReactorNettyConnectionSpanTest {
equalTo(SemanticAttributes.HTTP_URL, uri), equalTo(SemanticAttributes.HTTP_URL, uri),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_PEER_PORT, PortUtils.UNUSABLE_PORT)), equalTo(SemanticAttributes.NET_PEER_PORT, PortUtils.UNUSABLE_PORT)),
*/
span -> span ->
span.hasName("RESOLVE") span.hasName("RESOLVE")
.hasKind(INTERNAL) .hasKind(INTERNAL)