End reactor-netty HTTP client span properly on `Mono#timeout()` (#6891)

Calling `Mono#timeout()` with a timeout value smaller than the HTTP
client timeout caused the on request/response end callbacks to be simply
discarded; and the HTTP span was never finished.
This commit is contained in:
Mateusz Rzeszutek 2022-10-17 22:29:02 +02:00 committed by GitHub
parent b25283d2ca
commit b6ded1f9a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 94 additions and 19 deletions

View File

@ -13,6 +13,7 @@ import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.context.Context; import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.netty.v4_1.NettyClientTelemetry; import io.opentelemetry.instrumentation.netty.v4_1.NettyClientTelemetry;
import io.opentelemetry.instrumentation.reactor.ContextPropagationOperator; import io.opentelemetry.instrumentation.reactor.ContextPropagationOperator;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Function; import java.util.function.Function;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -58,8 +59,20 @@ public final class HttpResponseReceiverInstrumenter {
} }
static final class ContextHolder { static final class ContextHolder {
private static final AtomicReferenceFieldUpdater<ContextHolder, Context> contextUpdater =
AtomicReferenceFieldUpdater.newUpdater(ContextHolder.class, Context.class, "context");
volatile Context parentContext; volatile Context parentContext;
volatile Context context; volatile Context context;
void setContext(Context context) {
contextUpdater.set(this, context);
}
Context getAndRemoveContext() {
return contextUpdater.getAndSet(this, null);
}
} }
static final class StartOperation static final class StartOperation
@ -76,23 +89,33 @@ public final class HttpResponseReceiverInstrumenter {
@Override @Override
public Mono<? extends Connection> apply(Mono<? extends Connection> mono) { public Mono<? extends Connection> apply(Mono<? extends Connection> mono) {
return Mono.defer( return Mono.defer(
() -> { () -> {
Context parentContext = Context.current(); Context parentContext = Context.current();
contextHolder.parentContext = parentContext; contextHolder.parentContext = parentContext;
if (!instrumenter().shouldStart(parentContext, config)) { if (!instrumenter().shouldStart(parentContext, config)) {
// make context accessible via the reactor ContextView - the doOn* callbacks // make context accessible via the reactor ContextView - the doOn* callbacks
// instrumentation uses this to set the proper context for callbacks // instrumentation uses this to set the proper context for callbacks
return mono.contextWrite(ctx -> ctx.put(CLIENT_PARENT_CONTEXT_KEY, parentContext)); return mono.contextWrite(
} ctx -> ctx.put(CLIENT_PARENT_CONTEXT_KEY, parentContext));
}
Context context = instrumenter().start(parentContext, config); Context context = instrumenter().start(parentContext, config);
contextHolder.context = context; contextHolder.setContext(context);
return ContextPropagationOperator.runWithContext(mono, context) return ContextPropagationOperator.runWithContext(mono, context)
// make contexts accessible via the reactor ContextView - the doOn* callbacks // make contexts accessible via the reactor ContextView - the doOn* callbacks
// instrumentation uses the parent context to set the proper context for callbacks // instrumentation uses the parent context to set the proper context for
.contextWrite(ctx -> ctx.put(CLIENT_PARENT_CONTEXT_KEY, parentContext)) // callbacks
.contextWrite(ctx -> ctx.put(CLIENT_CONTEXT_KEY, context)); .contextWrite(ctx -> ctx.put(CLIENT_PARENT_CONTEXT_KEY, parentContext))
}); .contextWrite(ctx -> ctx.put(CLIENT_CONTEXT_KEY, context));
})
.doOnCancel(
() -> {
Context context = contextHolder.getAndRemoveContext();
if (context == null) {
return;
}
instrumenter().end(context, config, null, null);
});
} }
} }
@ -134,7 +157,7 @@ public final class HttpResponseReceiverInstrumenter {
@Override @Override
public void accept(HttpClientRequest httpClientRequest, Throwable error) { public void accept(HttpClientRequest httpClientRequest, Throwable error) {
Context context = contextHolder.context; Context context = contextHolder.getAndRemoveContext();
if (context == null) { if (context == null) {
return; return;
} }
@ -155,7 +178,7 @@ public final class HttpResponseReceiverInstrumenter {
@Override @Override
public void accept(HttpClientResponse response, Throwable error) { public void accept(HttpClientResponse response, Throwable error) {
Context context = contextHolder.context; Context context = contextHolder.getAndRemoveContext();
if (context == null) { if (context == null) {
return; return;
} }
@ -175,7 +198,7 @@ public final class HttpResponseReceiverInstrumenter {
@Override @Override
public void accept(HttpClientResponse response, Connection connection) { public void accept(HttpClientResponse response, Connection connection) {
Context context = contextHolder.context; Context context = contextHolder.getAndRemoveContext();
if (context == null) { if (context == null) {
return; return;
} }

View File

@ -9,6 +9,7 @@ import static io.opentelemetry.api.trace.SpanKind.CLIENT;
import static io.opentelemetry.api.trace.SpanKind.INTERNAL; import static io.opentelemetry.api.trace.SpanKind.INTERNAL;
import static io.opentelemetry.api.trace.SpanKind.SERVER; import static io.opentelemetry.api.trace.SpanKind.SERVER;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
import static org.assertj.core.api.Assertions.catchThrowable; import static org.assertj.core.api.Assertions.catchThrowable;
@ -17,6 +18,7 @@ import io.netty.resolver.AddressResolverGroup;
import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.test.utils.PortUtils; import io.opentelemetry.instrumentation.test.utils.PortUtils;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest; import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest;
@ -27,6 +29,7 @@ import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.time.Duration;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -260,6 +263,55 @@ abstract class AbstractReactorNettyHttpClientTest
assertThat(uniqueChannelHashes).hasSize(1); assertThat(uniqueChannelHashes).hasSize(1);
} }
@Test
void shouldEndSpanOnMonoTimeout() {
HttpClient httpClient = createHttpClient();
URI uri = resolveAddress("/read-timeout");
Throwable thrown =
catchThrowable(
() ->
testing.runWithSpan(
"parent",
() ->
httpClient
.get()
.uri(uri)
.responseSingle(
(resp, content) -> {
// Make sure to consume content since that's when we close the
// span.
return content.map(unused -> resp);
})
// apply Mono timeout that is way shorter than HTTP request timeout
.timeout(Duration.ofSeconds(1))
.block()));
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("parent")
.hasKind(SpanKind.INTERNAL)
.hasNoParent()
.hasStatus(StatusData.error())
.hasException(thrown),
span ->
span.hasName("HTTP GET")
.hasKind(CLIENT)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.HTTP_METHOD, "GET"),
equalTo(SemanticAttributes.HTTP_URL, uri.toString()),
equalTo(SemanticAttributes.HTTP_USER_AGENT, USER_AGENT),
equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"),
equalTo(SemanticAttributes.NET_PEER_PORT, uri.getPort())),
span ->
span.hasName("test-http-server")
.hasKind(SpanKind.SERVER)
.hasParent(trace.getSpan(1))));
}
private static void assertSameSpan(SpanData expected, AtomicReference<Span> actual) { private static void assertSameSpan(SpanData expected, AtomicReference<Span> actual) {
SpanContext expectedSpanContext = expected.getSpanContext(); SpanContext expectedSpanContext = expected.getSpanContext();
SpanContext actualSpanContext = actual.get().getSpanContext(); SpanContext actualSpanContext = actual.get().getSpanContext();