mirror of https://github.com/grpc/grpc-java.git
Close active streams on channelInactive
We think this broke when the stream lifecycle listener was removed. Observing the stream lifecycle would be the "proper" fix, but it had notification ordering issues where streams would close before we were notified of the event that caused the closure, which made it difficult to provide useful error messages. The ordering of notifications was also largely undefined. The long term fix we look forward to is the HTTP/2 child channels, which should have clearly defined ordering between error notification and channel closure, and in the order that we need here. Fixes #1251
This commit is contained in:
parent
8d0b5b0c4d
commit
96f9cefda4
|
|
@ -261,18 +261,21 @@ class NettyServerHandler extends AbstractNettyHandler {
|
|||
*/
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
super.channelInactive(ctx);
|
||||
// Any streams that are still active must be closed
|
||||
connection().forEachActiveStream(new Http2StreamVisitor() {
|
||||
@Override
|
||||
public boolean visit(Http2Stream stream) throws Http2Exception {
|
||||
NettyServerStream serverStream = serverStream(stream);
|
||||
if (serverStream != null) {
|
||||
serverStream.abortStream(GOAWAY_STATUS, false);
|
||||
try {
|
||||
// Any streams that are still active must be closed
|
||||
connection().forEachActiveStream(new Http2StreamVisitor() {
|
||||
@Override
|
||||
public boolean visit(Http2Stream stream) throws Http2Exception {
|
||||
NettyServerStream serverStream = serverStream(stream);
|
||||
if (serverStream != null) {
|
||||
serverStream.abortStream(GOAWAY_STATUS, false);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
});
|
||||
});
|
||||
} finally {
|
||||
super.channelInactive(ctx);
|
||||
}
|
||||
}
|
||||
|
||||
WriteQueue getWriteQueue() {
|
||||
|
|
|
|||
|
|
@ -237,6 +237,15 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
|
|||
assertFalse(channel().isOpen());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void channelInactiveShouldCloseStreams() throws Exception {
|
||||
createStream();
|
||||
handler().channelInactive(ctx());
|
||||
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
|
||||
verify(streamListener).closed(captor.capture());
|
||||
assertFalse(captor.getValue().isOk());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldAdvertiseMaxConcurrentStreams() throws Exception {
|
||||
maxConcurrentStreams = 314;
|
||||
|
|
|
|||
Loading…
Reference in New Issue