From bf000fef98001ae6b9164777e25998bf872baaee Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Wed, 9 Aug 2023 18:05:42 +0300 Subject: [PATCH] Netty 4.1: handle closing connection before the request completes (#9157) --- .../HttpServerRequestTracingHandler.java | 20 ++++++ .../v5_0/server/SpringWebfluxTest.java | 66 +++++++++++++++++++ .../server/SpringWebFluxTestApplication.java | 20 +++++- 3 files changed, 105 insertions(+), 1 deletion(-) diff --git a/instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/internal/server/HttpServerRequestTracingHandler.java b/instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/internal/server/HttpServerRequestTracingHandler.java index 28e81cd89c..2a198bae88 100644 --- a/instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/internal/server/HttpServerRequestTracingHandler.java +++ b/instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/internal/server/HttpServerRequestTracingHandler.java @@ -75,6 +75,26 @@ public class HttpServerRequestTracingHandler extends ChannelInboundHandlerAdapte } } + @Override + public void channelInactive(ChannelHandlerContext ctx) { + // connection was closed, close all remaining requests + Attribute> contextAttr = ctx.channel().attr(AttributeKeys.SERVER_CONTEXT); + Deque contexts = contextAttr.get(); + Attribute> requestAttr = ctx.channel().attr(HTTP_SERVER_REQUEST); + Deque requests = requestAttr.get(); + + if (contexts == null || requests == null) { + return; + } + + while (!contexts.isEmpty() || !requests.isEmpty()) { + Context context = contexts.pollFirst(); + HttpRequestAndChannel request = requests.pollFirst(); + + instrumenter.end(context, request, null, null); + } + } + private static Deque getOrCreate(Channel channel, AttributeKey> key) { Attribute> attribute = channel.attr(key); Deque deque = attribute.get(); diff --git a/instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/webflux/v5_0/server/SpringWebfluxTest.java b/instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/webflux/v5_0/server/SpringWebfluxTest.java index dda2b4c8a0..f464145f47 100644 --- a/instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/webflux/v5_0/server/SpringWebfluxTest.java +++ b/instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/webflux/v5_0/server/SpringWebfluxTest.java @@ -40,12 +40,15 @@ import io.opentelemetry.sdk.trace.data.StatusData; import io.opentelemetry.testing.internal.armeria.client.WebClient; import io.opentelemetry.testing.internal.armeria.common.AggregatedHttpResponse; import io.opentelemetry.testing.internal.armeria.common.HttpStatus; +import java.time.Duration; import java.util.Collections; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -831,6 +834,69 @@ public class SpringWebfluxTest { new FooModel(3L, "delayed").toString())))); } + @Test + void cancelRequestTest() throws Exception { + // fails with SingleThreadedSpringWebfluxTest + Assumptions.assumeTrue(this.getClass() == SpringWebfluxTest.class); + + WebClient client = + WebClient.builder("h1c://localhost:" + port) + .responseTimeout(Duration.ofSeconds(1)) + .followRedirects() + .build(); + try { + client.get("/slow").aggregate().get(); + } catch (ExecutionException ignore) { + // ignore + } + SpringWebFluxTestApplication.resumeSlowRequest(); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("GET /slow") + .hasKind(SpanKind.SERVER) + .hasNoParent() + .hasStatus(StatusData.unset()) + .hasAttributesSatisfyingExactly( + equalTo(NET_TRANSPORT, IP_TCP), + equalTo(NET_PROTOCOL_NAME, "http"), + equalTo(NET_PROTOCOL_VERSION, "1.1"), + equalTo(NET_SOCK_PEER_ADDR, "127.0.0.1"), + satisfies(NET_SOCK_PEER_PORT, val -> val.isInstanceOf(Long.class)), + equalTo(NET_SOCK_HOST_ADDR, "127.0.0.1"), + equalTo(NET_HOST_NAME, "localhost"), + satisfies(NET_HOST_PORT, val -> val.isInstanceOf(Long.class)), + equalTo(HTTP_TARGET, "/slow"), + equalTo(HTTP_METHOD, "GET"), + equalTo(HTTP_SCHEME, "http"), + satisfies(USER_AGENT_ORIGINAL, val -> val.isInstanceOf(String.class)), + equalTo(HTTP_ROUTE, "/slow"), + satisfies( + HTTP_REQUEST_CONTENT_LENGTH, + val -> + val.satisfiesAnyOf( + v -> assertThat(v).isInstanceOf(Long.class), + v -> assertThat(v).isNull())), + satisfies( + HTTP_RESPONSE_CONTENT_LENGTH, + val -> + val.satisfiesAnyOf( + v -> assertThat(v).isInstanceOf(Long.class), + v -> assertThat(v).isNull()))), + span -> + span.hasName("SpringWebFluxTestApplication$$Lambda$.handle") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + satisfies( + stringKey("spring-webflux.handler.type"), + value -> + value.startsWith( + "server.SpringWebFluxTestApplication$$Lambda$"))))); + } + private static class Parameter { Parameter( String urlPath, diff --git a/instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/test/java/server/SpringWebFluxTestApplication.java b/instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/test/java/server/SpringWebFluxTestApplication.java index 251bce2000..401388e234 100644 --- a/instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/test/java/server/SpringWebFluxTestApplication.java +++ b/instrumentation/spring/spring-webflux/spring-webflux-5.0/javaagent/src/test/java/server/SpringWebFluxTestApplication.java @@ -12,6 +12,8 @@ import static org.springframework.web.reactive.function.server.RouterFunctions.r import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.trace.Tracer; import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; @@ -31,6 +33,7 @@ import reactor.core.publisher.Mono; public class SpringWebFluxTestApplication { private static final Tracer tracer = GlobalOpenTelemetry.getTracer("test"); + private static final CountDownLatch slowRequestLatch = new CountDownLatch(1); @Bean RouterFunction echoRouterFunction(EchoHandler echoHandler) { @@ -71,7 +74,22 @@ public class SpringWebFluxTestApplication { greetingHandler.intResponse( Mono.just(Integer.parseInt(request.pathVariable("id"))) .delayElement(Duration.ofMillis(100)) - .map(SpringWebFluxTestApplication::tracedMethod))); + .map(SpringWebFluxTestApplication::tracedMethod))) + .andRoute( + GET("/slow"), + request -> { + try { + slowRequestLatch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + return Mono.delay(Duration.ofMillis(100)) + .then(ServerResponse.ok().body(BodyInserters.fromObject("ok"))); + }); + } + + public static void resumeSlowRequest() { + slowRequestLatch.countDown(); } @Component