Netty 4.1: handle closing connection before the request completes (#9157)

This commit is contained in:
Lauri Tulmin 2023-08-09 18:05:42 +03:00 committed by GitHub
parent a332eb2887
commit bf000fef98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 105 additions and 1 deletions

View File

@ -75,6 +75,26 @@ public class HttpServerRequestTracingHandler extends ChannelInboundHandlerAdapte
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
// connection was closed, close all remaining requests
Attribute<Deque<Context>> contextAttr = ctx.channel().attr(AttributeKeys.SERVER_CONTEXT);
Deque<Context> contexts = contextAttr.get();
Attribute<Deque<HttpRequestAndChannel>> requestAttr = ctx.channel().attr(HTTP_SERVER_REQUEST);
Deque<HttpRequestAndChannel> requests = requestAttr.get();
if (contexts == null || requests == null) {
return;
}
while (!contexts.isEmpty() || !requests.isEmpty()) {
Context context = contexts.pollFirst();
HttpRequestAndChannel request = requests.pollFirst();
instrumenter.end(context, request, null, null);
}
}
private static <T> Deque<T> getOrCreate(Channel channel, AttributeKey<Deque<T>> key) {
Attribute<Deque<T>> attribute = channel.attr(key);
Deque<T> deque = attribute.get();

View File

@ -40,12 +40,15 @@ import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.testing.internal.armeria.client.WebClient;
import io.opentelemetry.testing.internal.armeria.common.AggregatedHttpResponse;
import io.opentelemetry.testing.internal.armeria.common.HttpStatus;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@ -831,6 +834,69 @@ public class SpringWebfluxTest {
new FooModel(3L, "delayed").toString()))));
}
@Test
void cancelRequestTest() throws Exception {
// fails with SingleThreadedSpringWebfluxTest
Assumptions.assumeTrue(this.getClass() == SpringWebfluxTest.class);
WebClient client =
WebClient.builder("h1c://localhost:" + port)
.responseTimeout(Duration.ofSeconds(1))
.followRedirects()
.build();
try {
client.get("/slow").aggregate().get();
} catch (ExecutionException ignore) {
// ignore
}
SpringWebFluxTestApplication.resumeSlowRequest();
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("GET /slow")
.hasKind(SpanKind.SERVER)
.hasNoParent()
.hasStatus(StatusData.unset())
.hasAttributesSatisfyingExactly(
equalTo(NET_TRANSPORT, IP_TCP),
equalTo(NET_PROTOCOL_NAME, "http"),
equalTo(NET_PROTOCOL_VERSION, "1.1"),
equalTo(NET_SOCK_PEER_ADDR, "127.0.0.1"),
satisfies(NET_SOCK_PEER_PORT, val -> val.isInstanceOf(Long.class)),
equalTo(NET_SOCK_HOST_ADDR, "127.0.0.1"),
equalTo(NET_HOST_NAME, "localhost"),
satisfies(NET_HOST_PORT, val -> val.isInstanceOf(Long.class)),
equalTo(HTTP_TARGET, "/slow"),
equalTo(HTTP_METHOD, "GET"),
equalTo(HTTP_SCHEME, "http"),
satisfies(USER_AGENT_ORIGINAL, val -> val.isInstanceOf(String.class)),
equalTo(HTTP_ROUTE, "/slow"),
satisfies(
HTTP_REQUEST_CONTENT_LENGTH,
val ->
val.satisfiesAnyOf(
v -> assertThat(v).isInstanceOf(Long.class),
v -> assertThat(v).isNull())),
satisfies(
HTTP_RESPONSE_CONTENT_LENGTH,
val ->
val.satisfiesAnyOf(
v -> assertThat(v).isInstanceOf(Long.class),
v -> assertThat(v).isNull()))),
span ->
span.hasName("SpringWebFluxTestApplication$$Lambda$.handle")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
satisfies(
stringKey("spring-webflux.handler.type"),
value ->
value.startsWith(
"server.SpringWebFluxTestApplication$$Lambda$")))));
}
private static class Parameter {
Parameter(
String urlPath,

View File

@ -12,6 +12,8 @@ import static org.springframework.web.reactive.function.server.RouterFunctions.r
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
@ -31,6 +33,7 @@ import reactor.core.publisher.Mono;
public class SpringWebFluxTestApplication {
private static final Tracer tracer = GlobalOpenTelemetry.getTracer("test");
private static final CountDownLatch slowRequestLatch = new CountDownLatch(1);
@Bean
RouterFunction<ServerResponse> echoRouterFunction(EchoHandler echoHandler) {
@ -71,7 +74,22 @@ public class SpringWebFluxTestApplication {
greetingHandler.intResponse(
Mono.just(Integer.parseInt(request.pathVariable("id")))
.delayElement(Duration.ofMillis(100))
.map(SpringWebFluxTestApplication::tracedMethod)));
.map(SpringWebFluxTestApplication::tracedMethod)))
.andRoute(
GET("/slow"),
request -> {
try {
slowRequestLatch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return Mono.delay(Duration.ofMillis(100))
.then(ServerResponse.ok().body(BodyInserters.fromObject("ok")));
});
}
public static void resumeSlowRequest() {
slowRequestLatch.countDown();
}
@Component