Correctly handle disconnect notifications from Netty.

Previously the code close the HTTP2 streams prior to notifying the
application layer. This was the wrong order as the code depended on
enumerating the open streams to notify the application layer.
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=82733459
This commit is contained in:
lryan 2014-12-23 11:09:18 -08:00 committed by Eric Anderson
parent ca749c369f
commit b09c26b4dd
2 changed files with 34 additions and 8 deletions

View File

@ -189,15 +189,18 @@ class NettyClientHandler extends Http2ConnectionHandler {
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
try {
// Fail any streams that are awaiting creation.
Status goAwayStatus = goAwayStatus().withDescription("network channel closed");
failPendingStreams(goAwayStatus);
// Fail any streams that are awaiting creation.
Status goAwayStatus = goAwayStatus();
failPendingStreams(goAwayStatus);
// Any streams that are still active must be closed.
for (Http2Stream stream : http2Streams()) {
clientStream(stream).transportReportStatus(goAwayStatus, new Metadata.Trailers());
// Report status to the application layer for any open streams
for (Http2Stream stream : http2Streams()) {
clientStream(stream).transportReportStatus(goAwayStatus, new Metadata.Trailers());
}
} finally {
// Close any open streams
super.channelInactive(ctx);
}
}

View File

@ -263,6 +263,29 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase {
assertEquals(Status.UNAVAILABLE.getCode(), captor.getValue().getCode());
}
@Test
public void channelShutdownShouldFailQueuedStreams() throws Exception {
// Force a stream to get added to the pending queue.
setMaxConcurrentStreams(0);
handler.write(ctx, new CreateStreamCommand(grpcHeaders, stream),
promise);
handler.channelInactive(ctx);
verify(promise).setFailure(any(Throwable.class));
}
@Test
public void channelShutdownShouldFailInFlightStreams() throws Exception {
createStream();
handler.channelInactive(ctx);
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
InOrder inOrder = inOrder(stream);
inOrder.verify(stream, calls(1)).transportReportStatus(captor.capture(),
notNull(Metadata.Trailers.class));
assertEquals(Status.UNAVAILABLE.getCode(), captor.getValue().getCode());
}
private void setMaxConcurrentStreams(int max) throws Exception {
ByteBuf serializedSettings = serializeSettings(new Http2Settings().maxConcurrentStreams(max));
handler.channelRead(ctx, serializedSettings);