diff --git a/lib/netty b/lib/netty index e36c1436b8..a7d1dc362a 160000 --- a/lib/netty +++ b/lib/netty @@ -1 +1 @@ -Subproject commit e36c1436b80399175fad55d09848c9f29da2174e +Subproject commit a7d1dc362ae0ab5998723aae120b36d606a31425 diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/transport/netty/NettyClientHandler.java index ebfb381a54..55f2d0cf98 100644 --- a/netty/src/main/java/io/grpc/transport/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/transport/netty/NettyClientHandler.java @@ -167,7 +167,7 @@ class NettyClientHandler extends Http2ConnectionHandler { private void onHeadersRead(int streamId, Http2Headers headers, boolean endStream) throws Http2Exception { - NettyClientStream stream = clientStream(connection().requireStream(streamId)); + NettyClientStream stream = clientStream(requireHttp2Stream(streamId)); stream.transportHeadersReceived(headers, endStream); } @@ -175,8 +175,7 @@ class NettyClientHandler extends Http2ConnectionHandler { * Handler for an inbound HTTP/2 DATA frame. */ private void onDataRead(int streamId, ByteBuf data, boolean endOfStream) throws Http2Exception { - Http2Stream http2Stream = connection().requireStream(streamId); - NettyClientStream stream = clientStream(http2Stream); + NettyClientStream stream = clientStream(requireHttp2Stream(streamId)); stream.transportDataReceived(data, endOfStream); } @@ -186,8 +185,7 @@ class NettyClientHandler extends Http2ConnectionHandler { private void onRstStreamRead(int streamId) throws Http2Exception { // TODO(nmittler): do something with errorCode? - Http2Stream http2Stream = connection().requireStream(streamId); - NettyClientStream stream = clientStream(http2Stream); + NettyClientStream stream = clientStream(requireHttp2Stream(streamId)); stream.transportReportStatus(Status.UNKNOWN, false, new Metadata.Trailers()); } @@ -360,15 +358,24 @@ class NettyClientHandler extends Http2ConnectionHandler { return id; } + private Http2Stream requireHttp2Stream(int streamId) { + Http2Stream stream = connection().stream(streamId); + if (stream == null) { + // This should never happen. + throw new AssertionError("Stream does not exist: " + streamId); + } + return stream; + } + /** * Initializes the connection window if we haven't already. */ private void initConnectionWindow() throws Http2Exception { if (connectionWindowSize > 0 && ctx.channel().isActive()) { - Http2Stream stream = connection().connectionStream(); - int currentSize = decoder().flowController().windowSize(stream); + Http2Stream connectionStream = connection().connectionStream(); + int currentSize = connectionStream.localFlowState().windowSize(); int delta = connectionWindowSize - currentSize; - decoder().flowController().incrementWindowSize(ctx, stream, delta); + decoder().flowController().incrementWindowSize(ctx, connectionStream, delta); connectionWindowSize = -1; } } diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/transport/netty/NettyServerHandler.java index 0481e0036f..89fca0017f 100644 --- a/netty/src/main/java/io/grpc/transport/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/transport/netty/NettyServerHandler.java @@ -136,7 +136,7 @@ class NettyServerHandler extends Http2ConnectionHandler { try { // The Http2Stream object was put by AbstractHttp2ConnectionHandler before calling this // method. - Http2Stream http2Stream = connection().requireStream(streamId); + Http2Stream http2Stream = requireHttp2Stream(streamId); NettyServerStream stream = new NettyServerStream(ctx.channel(), http2Stream, this); http2Stream.setProperty(NettyServerStream.class, stream); String method = determineMethod(streamId, headers); @@ -153,10 +153,8 @@ class NettyServerHandler extends Http2ConnectionHandler { private void onDataRead(int streamId, ByteBuf data, boolean endOfStream) throws Http2Exception { try { - NettyServerStream stream = serverStream(connection().requireStream(streamId)); + NettyServerStream stream = serverStream(requireHttp2Stream(streamId)); stream.inboundDataReceived(data, endOfStream); - } catch (Http2Exception e) { - throw e; } catch (Throwable e) { logger.log(Level.WARNING, "Exception in onDataRead()", e); throw newStreamException(streamId, e); @@ -165,10 +163,8 @@ class NettyServerHandler extends Http2ConnectionHandler { private void onRstStreamRead(int streamId) throws Http2Exception { try { - NettyServerStream stream = serverStream(connection().requireStream(streamId)); + NettyServerStream stream = serverStream(requireHttp2Stream(streamId)); stream.abortStream(Status.CANCELLED, false); - } catch (Http2Exception e) { - throw e; } catch (Throwable e) { logger.log(Level.WARNING, "Exception in onRstStreamRead()", e); throw newStreamException(streamId, e); @@ -249,7 +245,7 @@ class NettyServerHandler extends Http2ConnectionHandler { } private void closeStreamWhenDone(ChannelPromise promise, int streamId) throws Http2Exception { - final NettyServerStream stream = serverStream(connection().requireStream(streamId)); + final NettyServerStream stream = serverStream(requireHttp2Stream(streamId)); promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { @@ -307,6 +303,15 @@ class NettyServerHandler extends Http2ConnectionHandler { }); } + private Http2Stream requireHttp2Stream(int streamId) { + Http2Stream stream = connection().stream(streamId); + if (stream == null) { + // This should never happen. + throw new AssertionError("Stream does not exist: " + streamId); + } + return stream; + } + private String determineMethod(int streamId, Http2Headers headers) throws Http2Exception { if (!HTTP_METHOD.equals(headers.method())) { throw Http2Exception.streamError(streamId, Http2Error.REFUSED_STREAM, diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/transport/netty/NettyClientHandlerTest.java index 80ca50a6b3..33ea779c1c 100644 --- a/netty/src/test/java/io/grpc/transport/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/transport/netty/NettyClientHandlerTest.java @@ -357,8 +357,8 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase { int connectionWindow = 1048576; // 1MiB handler = newHandler(connectionWindow, DEFAULT_WINDOW_SIZE); handler.handlerAdded(ctx); - assertEquals(connectionWindow, - handler.decoder().flowController().windowSize(handler.connection().connectionStream())); + assertEquals(connectionWindow, handler.connection().connectionStream().localFlowState() + .windowSize()); } @Test