From 96f9cefda41bbbf1462bfe607864d451bc716219 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Fri, 4 Dec 2015 16:04:03 -0800 Subject: [PATCH] Close active streams on channelInactive We think this broke when the stream lifecycle listener was removed. Observing the stream lifecycle would be the "proper" fix, but it had notification ordering issues where streams would close before we were notified of the event that caused the closure, which made it difficult to provide useful error messages. The ordering of notifications was also largely undefined. The long term fix we look forward to is the HTTP/2 child channels, which should have clearly defined ordering between error notification and channel closure, and in the order that we need here. Fixes #1251 --- .../io/grpc/netty/NettyServerHandler.java | 25 +++++++++++-------- .../io/grpc/netty/NettyServerHandlerTest.java | 9 +++++++ 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index e7d6ea2dd9..06b5a3fb9f 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -261,18 +261,21 @@ class NettyServerHandler extends AbstractNettyHandler { */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - super.channelInactive(ctx); - // Any streams that are still active must be closed - connection().forEachActiveStream(new Http2StreamVisitor() { - @Override - public boolean visit(Http2Stream stream) throws Http2Exception { - NettyServerStream serverStream = serverStream(stream); - if (serverStream != null) { - serverStream.abortStream(GOAWAY_STATUS, false); + try { + // Any streams that are still active must be closed + connection().forEachActiveStream(new Http2StreamVisitor() { + @Override + public boolean visit(Http2Stream stream) throws Http2Exception { + NettyServerStream serverStream = serverStream(stream); + if (serverStream != null) { + serverStream.abortStream(GOAWAY_STATUS, false); + } + return true; } - return true; - } - }); + }); + } finally { + super.channelInactive(ctx); + } } WriteQueue getWriteQueue() { diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java index 87078c2a4d..52ec3fd402 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java @@ -237,6 +237,15 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase captor = ArgumentCaptor.forClass(Status.class); + verify(streamListener).closed(captor.capture()); + assertFalse(captor.getValue().isOk()); + } + @Test public void shouldAdvertiseMaxConcurrentStreams() throws Exception { maxConcurrentStreams = 314;