Implement HTTP resend spec for Reactor Netty (excl CONNECT spans) (#8111)

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>
This commit is contained in:
Mateusz Rzeszutek 2023-07-21 16:37:48 +02:00 committed by GitHub
parent bd8ddf4db3
commit 718fa0da14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 285 additions and 252 deletions

View File

@ -74,12 +74,19 @@ public final class DecoratorFunctions {
@Nullable
private static Context getChannelContext(
ContextView contextView, PropagatedContext propagatedContext) {
InstrumentationContexts contexts =
contextView.getOrDefault(ReactorContextKeys.CONTEXTS_HOLDER_KEY, null);
if (contexts == null) {
return null;
}
Context context = null;
if (propagatedContext.useClientContext) {
context = contextView.getOrDefault(ReactorContextKeys.CLIENT_CONTEXT_KEY, null);
context = contexts.getClientContext();
}
if (context == null) {
context = contextView.getOrDefault(ReactorContextKeys.CLIENT_PARENT_CONTEXT_KEY, null);
context = contexts.getParentContext();
}
return context;
}

View File

@ -5,22 +5,17 @@
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorContextKeys.CLIENT_CONTEXT_KEY;
import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorContextKeys.CLIENT_PARENT_CONTEXT_KEY;
import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorNettySingletons.instrumenter;
import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorContextKeys.CONTEXTS_HOLDER_KEY;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientResend;
import io.opentelemetry.instrumentation.netty.v4_1.NettyClientTelemetry;
import io.opentelemetry.instrumentation.reactor.v3_1.ContextPropagationOperator;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiConsumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientConfig;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;
@ -29,25 +24,26 @@ public final class HttpResponseReceiverInstrumenter {
// this method adds several stateful listeners that execute the instrumenter lifecycle during HTTP
// request processing
// it should be used just before one of the response*() methods is called - after this point the
// HTTP
// request is no longer modifiable by the user
// HTTP request is no longer modifiable by the user
@Nullable
public static HttpClient.ResponseReceiver<?> instrument(HttpClient.ResponseReceiver<?> receiver) {
// receiver should always be an HttpClientFinalizer, which both extends HttpClient and
// implements ResponseReceiver
if (receiver instanceof HttpClient) {
HttpClient client = (HttpClient) receiver;
HttpClientConfig config = client.configuration();
ContextHolder contextHolder = new ContextHolder();
InstrumentationContexts instrumentationContexts = new InstrumentationContexts();
HttpClient modified =
client
.mapConnect(new StartOperation(contextHolder, config))
.doOnRequest(new PropagateContext(contextHolder))
.doOnRequestError(new EndOperationWithRequestError(contextHolder, config))
.doOnResponseError(new EndOperationWithResponseError(contextHolder, config))
.doAfterResponseSuccess(new EndOperationWithSuccess(contextHolder, config));
.mapConnect(new CaptureParentContext(instrumentationContexts))
.doOnRequestError(new EndOperationWithRequestError(instrumentationContexts))
.doOnRequest(new StartOperation(instrumentationContexts))
.doOnResponseError(new EndOperationWithResponseError(instrumentationContexts))
.doAfterResponseSuccess(new EndOperationWithSuccess(instrumentationContexts))
// end the current span on redirects; StartOperation will start another one for the
// next resend
.doOnRedirect(new EndOperationWithSuccess(instrumentationContexts));
// modified should always be an HttpClientFinalizer too
if (modified instanceof HttpClient.ResponseReceiver) {
@ -58,32 +54,13 @@ public final class HttpResponseReceiverInstrumenter {
return null;
}
static final class ContextHolder {
private static final AtomicReferenceFieldUpdater<ContextHolder, Context> contextUpdater =
AtomicReferenceFieldUpdater.newUpdater(ContextHolder.class, Context.class, "context");
volatile Context parentContext;
volatile Context context;
void setContext(Context context) {
contextUpdater.set(this, context);
}
Context getAndRemoveContext() {
return contextUpdater.getAndSet(this, null);
}
}
static final class StartOperation
private static final class CaptureParentContext
implements Function<Mono<? extends Connection>, Mono<? extends Connection>> {
private final ContextHolder contextHolder;
private final HttpClientConfig config;
private final InstrumentationContexts instrumentationContexts;
StartOperation(ContextHolder contextHolder, HttpClientConfig config) {
this.contextHolder = contextHolder;
this.config = config;
CaptureParentContext(InstrumentationContexts instrumentationContexts) {
this.instrumentationContexts = instrumentationContexts;
}
@Override
@ -91,118 +68,92 @@ public final class HttpResponseReceiverInstrumenter {
return Mono.defer(
() -> {
Context parentContext = Context.current();
contextHolder.parentContext = parentContext;
if (!instrumenter().shouldStart(parentContext, config)) {
// make context accessible via the reactor ContextView - the doOn* callbacks
// instrumentation uses this to set the proper context for callbacks
return mono.contextWrite(
ctx -> ctx.put(CLIENT_PARENT_CONTEXT_KEY, parentContext));
}
Context context = instrumenter().start(parentContext, config);
contextHolder.setContext(context);
return ContextPropagationOperator.runWithContext(mono, context)
// make contexts accessible via the reactor ContextView - the doOn* callbacks
// instrumentation uses the parent context to set the proper context for
// callbacks
.contextWrite(ctx -> ctx.put(CLIENT_PARENT_CONTEXT_KEY, parentContext))
.contextWrite(ctx -> ctx.put(CLIENT_CONTEXT_KEY, context));
instrumentationContexts.initialize(parentContext);
// make contexts accessible via the reactor ContextView - the doOn* callbacks
// instrumentation uses this to set the proper context for callbacks
return mono.contextWrite(
ctx -> ctx.put(CONTEXTS_HOLDER_KEY, instrumentationContexts));
})
.doOnCancel(
() -> {
Context context = contextHolder.getAndRemoveContext();
if (context == null) {
return;
}
instrumenter().end(context, config, null, null);
});
// if there's still any span in flight, end it
.doOnCancel(() -> instrumentationContexts.endClientSpan(null, null));
}
}
static final class PropagateContext implements BiConsumer<HttpClientRequest, Connection> {
private static final class StartOperation implements BiConsumer<HttpClientRequest, Connection> {
private final ContextHolder contextHolder;
private final InstrumentationContexts instrumentationContexts;
PropagateContext(ContextHolder contextHolder) {
this.contextHolder = contextHolder;
StartOperation(InstrumentationContexts instrumentationContexts) {
this.instrumentationContexts = instrumentationContexts;
}
@Override
public void accept(HttpClientRequest httpClientRequest, Connection connection) {
Context context = contextHolder.context;
if (context != null) {
GlobalOpenTelemetry.getPropagators()
.getTextMapPropagator()
.inject(context, httpClientRequest, HttpClientRequestHeadersSetter.INSTANCE);
}
public void accept(HttpClientRequest request, Connection connection) {
Context context = instrumentationContexts.startClientSpan(request);
// also propagate the context to the underlying netty instrumentation
// if this span was suppressed and context is null, propagate parentContext - this will allow
// netty spans to be suppressed too
Context nettyParentContext = context == null ? contextHolder.parentContext : context;
Context nettyParentContext =
context == null ? instrumentationContexts.getParentContext() : context;
NettyClientTelemetry.setChannelContext(connection.channel(), nettyParentContext);
}
}
static final class EndOperationWithRequestError
private static final class EndOperationWithRequestError
implements BiConsumer<HttpClientRequest, Throwable> {
private final ContextHolder contextHolder;
private final HttpClientConfig config;
private final InstrumentationContexts instrumentationContexts;
EndOperationWithRequestError(ContextHolder contextHolder, HttpClientConfig config) {
this.contextHolder = contextHolder;
this.config = config;
EndOperationWithRequestError(InstrumentationContexts instrumentationContexts) {
this.instrumentationContexts = instrumentationContexts;
}
@Override
public void accept(HttpClientRequest httpClientRequest, Throwable error) {
Context context = contextHolder.getAndRemoveContext();
if (context == null) {
return;
public void accept(HttpClientRequest request, Throwable error) {
instrumentationContexts.endClientSpan(null, error);
if (HttpClientResend.get(instrumentationContexts.getParentContext()) == 0) {
// TODO: emit connection error span
// FIXME: this branch requires lots of changes around the NettyConnectionInstrumenter
// currently it also creates that connection error span (when the connection telemetry is
// turned off), but without HTTP semantics - it does not have access to any HTTP information
// after all
// it should be possible to completely disable it, and just start and end the span here
// this requires lots of refactoring and pretty uninteresting changes in the netty code, so
// I'll do that in a separate PR - for better readability
}
instrumenter().end(context, config, null, error);
}
}
static final class EndOperationWithResponseError
private static final class EndOperationWithResponseError
implements BiConsumer<HttpClientResponse, Throwable> {
private final ContextHolder contextHolder;
private final HttpClientConfig config;
private final InstrumentationContexts instrumentationContexts;
EndOperationWithResponseError(ContextHolder contextHolder, HttpClientConfig config) {
this.contextHolder = contextHolder;
this.config = config;
EndOperationWithResponseError(InstrumentationContexts instrumentationContexts) {
this.instrumentationContexts = instrumentationContexts;
}
@Override
public void accept(HttpClientResponse response, Throwable error) {
Context context = contextHolder.getAndRemoveContext();
if (context == null) {
return;
}
instrumenter().end(context, config, response, error);
instrumentationContexts.endClientSpan(response, error);
}
}
static final class EndOperationWithSuccess implements BiConsumer<HttpClientResponse, Connection> {
private static final class EndOperationWithSuccess
implements BiConsumer<HttpClientResponse, Connection> {
private final ContextHolder contextHolder;
private final HttpClientConfig config;
private final InstrumentationContexts instrumentationContexts;
EndOperationWithSuccess(ContextHolder contextHolder, HttpClientConfig config) {
this.contextHolder = contextHolder;
this.config = config;
EndOperationWithSuccess(InstrumentationContexts instrumentationContexts) {
this.instrumentationContexts = instrumentationContexts;
}
@Override
public void accept(HttpClientResponse response, Connection connection) {
Context context = contextHolder.getAndRemoveContext();
if (context == null) {
return;
}
instrumenter().end(context, config, response, null);
instrumentationContexts.endClientSpan(response, null);
}
}

View File

@ -0,0 +1,77 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorNettySingletons.instrumenter;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientResend;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;
final class InstrumentationContexts {
private static final Logger logger = Logger.getLogger(InstrumentationContexts.class.getName());
private volatile Context parentContext;
// on retries, reactor-netty starts the next resend attempt before it ends the previous one (i.e.
// it calls the callback functions in that order); thus for a short moment there can be 2
// coexisting HTTP client spans
private final Queue<RequestAndContext> clientContexts = new ArrayBlockingQueue<>(2, true);
void initialize(Context parentContext) {
this.parentContext = HttpClientResend.initialize(parentContext);
}
Context getParentContext() {
return parentContext;
}
@Nullable
Context getClientContext() {
RequestAndContext requestAndContext = clientContexts.peek();
return requestAndContext == null ? null : requestAndContext.context;
}
@Nullable
Context startClientSpan(HttpClientRequest request) {
Context parentContext = this.parentContext;
Context context = null;
if (instrumenter().shouldStart(parentContext, request)) {
context = instrumenter().start(parentContext, request);
if (!clientContexts.offer(new RequestAndContext(request, context))) {
// should not ever happen in reality
String message =
"Could not instrument HTTP client request; not enough space in the request queue";
logger.log(Level.FINE, message);
instrumenter().end(context, request, null, new IllegalStateException(message));
}
}
return context;
}
void endClientSpan(@Nullable HttpClientResponse response, @Nullable Throwable error) {
RequestAndContext requestAndContext = clientContexts.poll();
if (requestAndContext != null) {
instrumenter().end(requestAndContext.context, requestAndContext.request, response, error);
}
}
static final class RequestAndContext {
final HttpClientRequest request;
final Context context;
RequestAndContext(HttpClientRequest request, Context context) {
this.request = request;
this.context = context;
}
}
}

View File

@ -7,10 +7,8 @@ package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
public final class ReactorContextKeys {
public static final String CLIENT_PARENT_CONTEXT_KEY =
ReactorContextKeys.class.getName() + ".client-parent-context";
public static final String CLIENT_CONTEXT_KEY =
ReactorContextKeys.class.getName() + ".client-context";
public static final String CONTEXTS_HOLDER_KEY =
ReactorContextKeys.class.getName() + ".contexts-holder";
private ReactorContextKeys() {}
}

View File

@ -6,76 +6,38 @@
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesGetter;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import javax.annotation.Nullable;
import reactor.netty.http.client.HttpClientConfig;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;
final class ReactorNettyHttpClientAttributesGetter
implements HttpClientAttributesGetter<HttpClientConfig, HttpClientResponse> {
implements HttpClientAttributesGetter<HttpClientRequest, HttpClientResponse> {
@Override
public String getUrlFull(HttpClientConfig request) {
String uri = request.uri();
if (isAbsolute(uri)) {
return uri;
}
// use the baseUrl if it was configured
String baseUrl = request.baseUrl();
if (uri == null) {
// internally reactor netty appends "/" to the baseUrl
return baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
}
if (baseUrl != null) {
if (baseUrl.endsWith("/") && uri.startsWith("/")) {
baseUrl = baseUrl.substring(0, baseUrl.length() - 1);
}
return baseUrl + uri;
}
// otherwise, use the host+port config to construct the full url
SocketAddress hostAddress = request.remoteAddress().get();
if (hostAddress instanceof InetSocketAddress) {
InetSocketAddress inetHostAddress = (InetSocketAddress) hostAddress;
return (request.isSecure() ? "https://" : "http://")
+ inetHostAddress.getHostString()
+ ":"
+ inetHostAddress.getPort()
+ (uri.startsWith("/") ? "" : "/")
+ uri;
}
return uri;
}
private static boolean isAbsolute(String uri) {
return uri != null && !uri.isEmpty() && !uri.startsWith("/");
public String getUrlFull(HttpClientRequest request) {
return request.resourceUrl();
}
@Override
public String getHttpRequestMethod(HttpClientConfig request) {
public String getHttpRequestMethod(HttpClientRequest request) {
return request.method().name();
}
@Override
public List<String> getHttpRequestHeader(HttpClientConfig request, String name) {
return request.headers().getAll(name);
public List<String> getHttpRequestHeader(HttpClientRequest request, String name) {
return request.requestHeaders().getAll(name);
}
@Override
public Integer getHttpResponseStatusCode(
HttpClientConfig request, HttpClientResponse response, @Nullable Throwable error) {
HttpClientRequest request, HttpClientResponse response, @Nullable Throwable error) {
return response.status().code();
}
@Override
public List<String> getHttpResponseHeader(
HttpClientConfig request, HttpClientResponse response, String name) {
HttpClientRequest request, HttpClientResponse response, String name) {
return response.responseHeaders().getAll(name);
}
}

View File

@ -11,16 +11,16 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import javax.annotation.Nullable;
import reactor.netty.Connection;
import reactor.netty.http.client.HttpClientConfig;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;
final class ReactorNettyNetClientAttributesGetter
implements NetClientAttributesGetter<HttpClientConfig, HttpClientResponse> {
implements NetClientAttributesGetter<HttpClientRequest, HttpClientResponse> {
@Nullable
@Override
public String getNetworkProtocolName(
HttpClientConfig request, @Nullable HttpClientResponse response) {
HttpClientRequest request, @Nullable HttpClientResponse response) {
if (response == null) {
return null;
}
@ -30,7 +30,7 @@ final class ReactorNettyNetClientAttributesGetter
@Nullable
@Override
public String getNetworkProtocolVersion(
HttpClientConfig request, @Nullable HttpClientResponse response) {
HttpClientRequest request, @Nullable HttpClientResponse response) {
if (response == null) {
return null;
}
@ -40,20 +40,20 @@ final class ReactorNettyNetClientAttributesGetter
@Nullable
@Override
public String getServerAddress(HttpClientConfig request) {
public String getServerAddress(HttpClientRequest request) {
return getHost(request);
}
@Nullable
@Override
public Integer getServerPort(HttpClientConfig request) {
public Integer getServerPort(HttpClientRequest request) {
return getPort(request);
}
@Nullable
@Override
public InetSocketAddress getServerInetSocketAddress(
HttpClientConfig request, @Nullable HttpClientResponse response) {
HttpClientRequest request, @Nullable HttpClientResponse response) {
// we're making use of the fact that HttpClientOperations is both a Connection and an
// HttpClientResponse
@ -68,30 +68,14 @@ final class ReactorNettyNetClientAttributesGetter
}
@Nullable
private static String getHost(HttpClientConfig request) {
String baseUrl = request.baseUrl();
String uri = request.uri();
if (baseUrl != null && !isAbsolute(uri)) {
return UrlParser.getHost(baseUrl);
} else {
return UrlParser.getHost(uri);
}
private static String getHost(HttpClientRequest request) {
String resourceUrl = request.resourceUrl();
return resourceUrl == null ? null : UrlParser.getHost(resourceUrl);
}
@Nullable
private static Integer getPort(HttpClientConfig request) {
String baseUrl = request.baseUrl();
String uri = request.uri();
if (baseUrl != null && !isAbsolute(uri)) {
return UrlParser.getPort(baseUrl);
} else {
return UrlParser.getPort(uri);
}
}
private static boolean isAbsolute(String uri) {
return uri != null && !uri.isEmpty() && !uri.startsWith("/");
private static Integer getPort(HttpClientRequest request) {
String resourceUrl = request.resourceUrl();
return resourceUrl == null ? null : UrlParser.getPort(resourceUrl);
}
}

View File

@ -8,7 +8,6 @@ package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientExperimentalMetrics;
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientMetrics;
@ -20,7 +19,7 @@ import io.opentelemetry.instrumentation.netty.v4.common.internal.client.NettyCon
import io.opentelemetry.javaagent.bootstrap.internal.CommonConfig;
import io.opentelemetry.javaagent.bootstrap.internal.DeprecatedConfigProperties;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
import reactor.netty.http.client.HttpClientConfig;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;
public final class ReactorNettySingletons {
@ -39,7 +38,7 @@ public final class ReactorNettySingletons {
false);
}
private static final Instrumenter<HttpClientConfig, HttpClientResponse> INSTRUMENTER;
private static final Instrumenter<HttpClientRequest, HttpClientResponse> INSTRUMENTER;
private static final NettyConnectionInstrumenter CONNECTION_INSTRUMENTER;
static {
@ -48,8 +47,8 @@ public final class ReactorNettySingletons {
ReactorNettyNetClientAttributesGetter netAttributesGetter =
new ReactorNettyNetClientAttributesGetter();
InstrumenterBuilder<HttpClientConfig, HttpClientResponse> builder =
Instrumenter.<HttpClientConfig, HttpClientResponse>builder(
InstrumenterBuilder<HttpClientRequest, HttpClientResponse> builder =
Instrumenter.<HttpClientRequest, HttpClientResponse>builder(
GlobalOpenTelemetry.get(),
INSTRUMENTATION_NAME,
HttpSpanNameExtractor.create(httpAttributesGetter))
@ -67,10 +66,7 @@ public final class ReactorNettySingletons {
if (CommonConfig.get().shouldEmitExperimentalHttpClientMetrics()) {
builder.addOperationMetrics(HttpClientExperimentalMetrics.get());
}
INSTRUMENTER =
builder
// headers are injected in ResponseReceiverInstrumenter
.buildInstrumenter(SpanKindExtractor.alwaysClient());
INSTRUMENTER = builder.buildClientInstrumenter(HttpClientRequestHeadersSetter.INSTANCE);
NettyClientInstrumenterFactory instrumenterFactory =
new NettyClientInstrumenterFactory(
@ -83,7 +79,7 @@ public final class ReactorNettySingletons {
CONNECTION_INSTRUMENTER = instrumenterFactory.createConnectionInstrumenter();
}
public static Instrumenter<HttpClientConfig, HttpClientResponse> instrumenter() {
public static Instrumenter<HttpClientRequest, HttpClientResponse> instrumenter() {
return INSTRUMENTER;
}

View File

@ -104,9 +104,13 @@ abstract class AbstractReactorNettyHttpClientTest
@Override
protected void configure(HttpClientTestOptions.Builder optionsBuilder) {
optionsBuilder.disableTestRedirects();
optionsBuilder.markAsLowLevelInstrumentation();
optionsBuilder.setMaxRedirects(52);
optionsBuilder.setUserAgent(USER_AGENT);
optionsBuilder.enableTestCallbackWithImplicitParent();
// TODO: remove this test altogether? this scenario is (was) only implemented in reactor-netty,
// all other HTTP clients worked in a different way
// optionsBuilder.enableTestCallbackWithImplicitParent();
optionsBuilder.setClientSpanErrorMapper(
(uri, exception) -> {
@ -120,9 +124,21 @@ abstract class AbstractReactorNettyHttpClientTest
return exception;
});
// TODO: see the comment in HttpResponseReceiverInstrumenter.EndOperationWithRequestError
optionsBuilder.setExpectedClientSpanNameMapper(
AbstractReactorNettyHttpClientTest::getExpectedClientSpanName);
optionsBuilder.setHttpAttributes(this::getHttpAttributes);
}
private static String getExpectedClientSpanName(URI uri, String method) {
// unopened port or non routable address
if ("http://localhost:61/".equals(uri.toString())
|| "https://192.0.2.1/".equals(uri.toString())) {
return "CONNECT";
}
return HttpClientTestOptions.DEFAULT_EXPECTED_CLIENT_SPAN_NAME_MAPPER.apply(uri, method);
}
protected Set<AttributeKey<?>> getHttpAttributes(URI uri) {
// unopened port or non routable address
if ("http://localhost:61/".equals(uri.toString())
@ -185,7 +201,7 @@ abstract class AbstractReactorNettyHttpClientTest
span -> span.hasName("GET").hasKind(CLIENT).hasParent(parentSpan),
span -> span.hasName("test-http-server").hasKind(SERVER).hasParent(nettyClientSpan));
assertSameSpan(nettyClientSpan, onRequestSpan);
assertSameSpan(parentSpan, onRequestSpan);
assertSameSpan(nettyClientSpan, afterRequestSpan);
assertSameSpan(nettyClientSpan, onResponseSpan);
assertSameSpan(parentSpan, afterResponseSpan);
@ -306,6 +322,7 @@ abstract class AbstractReactorNettyHttpClientTest
equalTo(SemanticAttributes.HTTP_METHOD, "GET"),
equalTo(SemanticAttributes.HTTP_URL, uri.toString()),
equalTo(SemanticAttributes.USER_AGENT_ORIGINAL, USER_AGENT),
equalTo(SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH, 0),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_PEER_PORT, uri.getPort())),
span ->

View File

@ -8,6 +8,7 @@ package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
import static io.opentelemetry.api.trace.SpanKind.CLIENT;
import static io.opentelemetry.api.trace.SpanKind.INTERNAL;
import static io.opentelemetry.api.trace.SpanKind.SERVER;
import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.AbstractReactorNettyHttpClientTest.USER_AGENT;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static org.assertj.core.api.Assertions.assertThat;
@ -85,6 +86,7 @@ class ReactorNettyBaseUrlOnlyTest {
() ->
httpClient
.baseUrl(uri)
.headers(h -> h.set("User-Agent", USER_AGENT))
.get()
.responseSingle(
(resp, content) -> {
@ -108,7 +110,9 @@ class ReactorNettyBaseUrlOnlyTest {
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.HTTP_METHOD, "GET"),
equalTo(SemanticAttributes.HTTP_URL, uri + "/"),
equalTo(SemanticAttributes.USER_AGENT_ORIGINAL, USER_AGENT),
equalTo(SemanticAttributes.HTTP_STATUS_CODE, 200),
equalTo(SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH, 0),
satisfies(
SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH,
AbstractLongAssert::isNotNegative),

View File

@ -8,6 +8,7 @@ package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
import static io.opentelemetry.api.trace.SpanKind.CLIENT;
import static io.opentelemetry.api.trace.SpanKind.INTERNAL;
import static io.opentelemetry.api.trace.SpanKind.SERVER;
import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.AbstractReactorNettyHttpClientTest.USER_AGENT;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
@ -80,6 +81,7 @@ class ReactorNettyClientSslTest {
.hasNoParent()
.hasStatus(StatusData.error())
.hasException(thrown),
/* FIXME: this span will be brought back in the next PR, when connection error spans are reintroduced
span ->
span.hasName("GET")
.hasKind(CLIENT)
@ -93,10 +95,11 @@ class ReactorNettyClientSslTest {
equalTo(SemanticAttributes.HTTP_URL, uri),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_PEER_PORT, server.httpsPort())),
*/
span ->
span.hasName("RESOLVE")
.hasKind(INTERNAL)
.hasParent(trace.getSpan(1))
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
@ -104,7 +107,7 @@ class ReactorNettyClientSslTest {
span ->
span.hasName("CONNECT")
.hasKind(INTERNAL)
.hasParent(trace.getSpan(1))
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
@ -113,7 +116,7 @@ class ReactorNettyClientSslTest {
span ->
span.hasName("SSL handshake")
.hasKind(INTERNAL)
.hasParent(trace.getSpan(1))
.hasParent(trace.getSpan(0))
.hasStatus(StatusData.error())
// netty swallows the exception, it doesn't make any sense to hard-code the
// message
@ -132,6 +135,7 @@ class ReactorNettyClientSslTest {
Mono<HttpClientResponse> responseMono =
httpClient
.headers(h -> h.set("User-Agent", USER_AGENT))
.get()
.uri(uri)
.responseSingle(
@ -146,26 +150,10 @@ class ReactorNettyClientSslTest {
trace ->
trace.hasSpansSatisfyingExactlyInAnyOrder(
span -> span.hasName("parent").hasKind(INTERNAL).hasNoParent(),
span ->
span.hasName("GET")
.hasKind(CLIENT)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.HTTP_METHOD, "GET"),
equalTo(SemanticAttributes.HTTP_URL, uri),
equalTo(SemanticAttributes.HTTP_STATUS_CODE, 200),
satisfies(
SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH,
AbstractLongAssert::isNotNegative),
equalTo(SemanticAttributes.NET_PROTOCOL_NAME, "http"),
equalTo(SemanticAttributes.NET_PROTOCOL_VERSION, "1.1"),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_PEER_PORT, server.httpsPort()),
equalTo(SemanticAttributes.NET_SOCK_PEER_ADDR, "127.0.0.1")),
span ->
span.hasName("RESOLVE")
.hasKind(INTERNAL)
.hasParent(trace.getSpan(1))
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
@ -173,7 +161,7 @@ class ReactorNettyClientSslTest {
span ->
span.hasName("CONNECT")
.hasKind(INTERNAL)
.hasParent(trace.getSpan(1))
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
@ -182,14 +170,32 @@ class ReactorNettyClientSslTest {
span ->
span.hasName("SSL handshake")
.hasKind(INTERNAL)
.hasParent(trace.getSpan(1))
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP),
equalTo(SemanticAttributes.NET_SOCK_PEER_ADDR, "127.0.0.1"),
equalTo(SemanticAttributes.NET_SOCK_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_SOCK_PEER_PORT, server.httpsPort())),
span ->
span.hasName("test-http-server").hasKind(SERVER).hasParent(trace.getSpan(1))));
span.hasName("GET")
.hasKind(CLIENT)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.HTTP_METHOD, "GET"),
equalTo(SemanticAttributes.HTTP_URL, uri),
equalTo(SemanticAttributes.USER_AGENT_ORIGINAL, USER_AGENT),
equalTo(SemanticAttributes.NET_PROTOCOL_NAME, "http"),
equalTo(SemanticAttributes.NET_PROTOCOL_VERSION, "1.1"),
equalTo(SemanticAttributes.HTTP_STATUS_CODE, 200),
equalTo(SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH, 0),
satisfies(
SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH,
AbstractLongAssert::isNotNegative),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_PEER_PORT, server.httpsPort()),
equalTo(SemanticAttributes.NET_SOCK_PEER_ADDR, "127.0.0.1")),
span ->
span.hasName("test-http-server").hasKind(SERVER).hasParent(trace.getSpan(4))));
}
private static HttpClient createHttpClient() throws SSLException {

View File

@ -8,6 +8,7 @@ package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
import static io.opentelemetry.api.trace.SpanKind.CLIENT;
import static io.opentelemetry.api.trace.SpanKind.INTERNAL;
import static io.opentelemetry.api.trace.SpanKind.SERVER;
import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.AbstractReactorNettyHttpClientTest.USER_AGENT;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP;
@ -55,6 +56,7 @@ class ReactorNettyConnectionSpanTest {
"parent",
() ->
httpClient
.headers(h -> h.set("User-Agent", USER_AGENT))
.get()
.uri(uri)
.responseSingle(
@ -72,26 +74,10 @@ class ReactorNettyConnectionSpanTest {
trace ->
trace.hasSpansSatisfyingExactlyInAnyOrder(
span -> span.hasName("parent").hasKind(INTERNAL).hasNoParent(),
span ->
span.hasName("GET")
.hasKind(CLIENT)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.HTTP_METHOD, "GET"),
equalTo(SemanticAttributes.HTTP_URL, uri),
equalTo(SemanticAttributes.HTTP_STATUS_CODE, 200),
satisfies(
SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH,
AbstractLongAssert::isNotNegative),
equalTo(SemanticAttributes.NET_PROTOCOL_NAME, "http"),
equalTo(SemanticAttributes.NET_PROTOCOL_VERSION, "1.1"),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_PEER_PORT, server.httpPort()),
equalTo(SemanticAttributes.NET_SOCK_PEER_ADDR, "127.0.0.1")),
span ->
span.hasName("RESOLVE")
.hasKind(INTERNAL)
.hasParent(trace.getSpan(1))
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
@ -99,14 +85,32 @@ class ReactorNettyConnectionSpanTest {
span ->
span.hasName("CONNECT")
.hasKind(INTERNAL)
.hasParent(trace.getSpan(1))
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_PEER_PORT, server.httpPort()),
equalTo(SemanticAttributes.NET_SOCK_PEER_ADDR, "127.0.0.1")),
span ->
span.hasName("test-http-server").hasKind(SERVER).hasParent(trace.getSpan(1))));
span.hasName("GET")
.hasKind(CLIENT)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.HTTP_METHOD, "GET"),
equalTo(SemanticAttributes.HTTP_URL, uri),
equalTo(SemanticAttributes.USER_AGENT_ORIGINAL, USER_AGENT),
equalTo(SemanticAttributes.NET_PROTOCOL_NAME, "http"),
equalTo(SemanticAttributes.NET_PROTOCOL_VERSION, "1.1"),
equalTo(SemanticAttributes.HTTP_STATUS_CODE, 200),
equalTo(SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH, 0),
satisfies(
SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH,
AbstractLongAssert::isNotNegative),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_PEER_PORT, server.httpPort()),
equalTo(SemanticAttributes.NET_SOCK_PEER_ADDR, "127.0.0.1")),
span ->
span.hasName("test-http-server").hasKind(SERVER).hasParent(trace.getSpan(3))));
}
@Test
@ -137,13 +141,14 @@ class ReactorNettyConnectionSpanTest {
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactlyInAnyOrder(
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("parent")
.hasKind(INTERNAL)
.hasNoParent()
.hasStatus(StatusData.error())
.hasException(thrown),
/* FIXME: this span will be brought back in the next PR, when connection error spans are reintroduced
span ->
span.hasName("GET")
.hasKind(CLIENT)
@ -155,10 +160,11 @@ class ReactorNettyConnectionSpanTest {
equalTo(SemanticAttributes.HTTP_URL, uri),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_PEER_PORT, PortUtils.UNUSABLE_PORT)),
*/
span ->
span.hasName("RESOLVE")
.hasKind(INTERNAL)
.hasParent(trace.getSpan(1))
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NET_TRANSPORT, IP_TCP),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
@ -166,7 +172,7 @@ class ReactorNettyConnectionSpanTest {
span ->
span.hasName("CONNECT")
.hasKind(INTERNAL)
.hasParent(trace.getSpan(1))
.hasParent(trace.getSpan(0))
.hasStatus(StatusData.error())
.hasException(connectException)
.hasAttributesSatisfyingExactly(

View File

@ -78,6 +78,12 @@ public abstract class InstrumentationTestRunner {
waitAndAssertTraces(traceComparator, Arrays.asList(assertions), true);
}
public final void waitAndAssertSortedTraces(
Comparator<List<SpanData>> traceComparator,
Iterable<? extends Consumer<TraceAssert>> assertions) {
waitAndAssertTraces(traceComparator, assertions, true);
}
@SafeVarargs
@SuppressWarnings("varargs")
public final void waitAndAssertTracesWithoutScopeVersionVerification(

View File

@ -165,6 +165,12 @@ public abstract class InstrumentationExtension
testRunner.waitAndAssertSortedTraces(traceComparator, assertions);
}
public final void waitAndAssertSortedTraces(
Comparator<List<SpanData>> traceComparator,
Iterable<? extends Consumer<TraceAssert>> assertions) {
testRunner.waitAndAssertSortedTraces(traceComparator, assertions);
}
@SafeVarargs
@SuppressWarnings("varargs")
public final void waitAndAssertTracesWithoutScopeVersionVerification(

View File

@ -5,6 +5,7 @@
package io.opentelemetry.instrumentation.testing.junit.http;
import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.comparingRootSpanAttribute;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP;
@ -254,7 +255,8 @@ public abstract class AbstractHttpClientTest<REQUEST> implements HttpClientTypeA
assertThat(responseCode).isEqualTo(200);
if (options.isLowLevelInstrumentation()) {
testing.waitAndAssertTraces(
testing.waitAndAssertSortedTraces(
comparingRootSpanAttribute(SemanticAttributes.HTTP_RESEND_COUNT),
trace -> {
trace.hasSpansSatisfyingExactly(
span ->
@ -293,7 +295,8 @@ public abstract class AbstractHttpClientTest<REQUEST> implements HttpClientTypeA
assertThat(responseCode).isEqualTo(200);
if (options.isLowLevelInstrumentation()) {
testing.waitAndAssertTraces(
testing.waitAndAssertSortedTraces(
comparingRootSpanAttribute(SemanticAttributes.HTTP_RESEND_COUNT),
trace -> {
trace.hasSpansSatisfyingExactly(
span ->
@ -351,7 +354,8 @@ public abstract class AbstractHttpClientTest<REQUEST> implements HttpClientTypeA
Throwable clientError = options.getClientSpanErrorMapper().apply(uri, ex);
if (options.isLowLevelInstrumentation()) {
testing.waitAndAssertTraces(
testing.waitAndAssertSortedTraces(
comparingRootSpanAttribute(SemanticAttributes.HTTP_RESEND_COUNT),
IntStream.range(0, options.getMaxRedirects())
.mapToObj(i -> makeCircularRedirectAssertForLolLevelTrace(uri, method, i))
.collect(Collectors.toList()));
@ -397,7 +401,8 @@ public abstract class AbstractHttpClientTest<REQUEST> implements HttpClientTypeA
assertThat(responseCode).isEqualTo(200);
if (options.isLowLevelInstrumentation()) {
testing.waitAndAssertTraces(
testing.waitAndAssertSortedTraces(
comparingRootSpanAttribute(SemanticAttributes.HTTP_RESEND_COUNT),
trace -> {
trace.hasSpansSatisfyingExactly(
span ->

View File

@ -8,6 +8,7 @@ package io.opentelemetry.instrumentation.testing.util;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.SpanId;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.sdk.trace.data.SpanData;
@ -34,6 +35,13 @@ public final class TelemetryDataUtil {
return Comparator.comparing(span -> list.indexOf(span.get(0).getName()));
}
public static <T extends Comparable<T>> Comparator<List<SpanData>> comparingRootSpanAttribute(
AttributeKey<T> key) {
return Comparator.comparing(
span -> span.get(0).getAttributes().get(key),
Comparator.nullsFirst(Comparator.naturalOrder()));
}
public static List<List<SpanData>> groupTraces(List<SpanData> spans) {
List<List<SpanData>> traces =
new ArrayList<>(