Upgrading to latest Netty version.

This is to resolve the issue of receiving frames for closed (and
therefore missing) streams.
This commit is contained in:
nmittler 2015-04-22 09:25:04 -07:00
parent fc3e41674b
commit 3ecbd0456a
4 changed files with 31 additions and 19 deletions

@ -1 +1 @@
Subproject commit e36c1436b80399175fad55d09848c9f29da2174e Subproject commit a7d1dc362ae0ab5998723aae120b36d606a31425

View File

@ -167,7 +167,7 @@ class NettyClientHandler extends Http2ConnectionHandler {
private void onHeadersRead(int streamId, Http2Headers headers, boolean endStream) private void onHeadersRead(int streamId, Http2Headers headers, boolean endStream)
throws Http2Exception { throws Http2Exception {
NettyClientStream stream = clientStream(connection().requireStream(streamId)); NettyClientStream stream = clientStream(requireHttp2Stream(streamId));
stream.transportHeadersReceived(headers, endStream); stream.transportHeadersReceived(headers, endStream);
} }
@ -175,8 +175,7 @@ class NettyClientHandler extends Http2ConnectionHandler {
* Handler for an inbound HTTP/2 DATA frame. * Handler for an inbound HTTP/2 DATA frame.
*/ */
private void onDataRead(int streamId, ByteBuf data, boolean endOfStream) throws Http2Exception { private void onDataRead(int streamId, ByteBuf data, boolean endOfStream) throws Http2Exception {
Http2Stream http2Stream = connection().requireStream(streamId); NettyClientStream stream = clientStream(requireHttp2Stream(streamId));
NettyClientStream stream = clientStream(http2Stream);
stream.transportDataReceived(data, endOfStream); stream.transportDataReceived(data, endOfStream);
} }
@ -186,8 +185,7 @@ class NettyClientHandler extends Http2ConnectionHandler {
private void onRstStreamRead(int streamId) private void onRstStreamRead(int streamId)
throws Http2Exception { throws Http2Exception {
// TODO(nmittler): do something with errorCode? // TODO(nmittler): do something with errorCode?
Http2Stream http2Stream = connection().requireStream(streamId); NettyClientStream stream = clientStream(requireHttp2Stream(streamId));
NettyClientStream stream = clientStream(http2Stream);
stream.transportReportStatus(Status.UNKNOWN, false, new Metadata.Trailers()); stream.transportReportStatus(Status.UNKNOWN, false, new Metadata.Trailers());
} }
@ -360,15 +358,24 @@ class NettyClientHandler extends Http2ConnectionHandler {
return id; return id;
} }
private Http2Stream requireHttp2Stream(int streamId) {
Http2Stream stream = connection().stream(streamId);
if (stream == null) {
// This should never happen.
throw new AssertionError("Stream does not exist: " + streamId);
}
return stream;
}
/** /**
* Initializes the connection window if we haven't already. * Initializes the connection window if we haven't already.
*/ */
private void initConnectionWindow() throws Http2Exception { private void initConnectionWindow() throws Http2Exception {
if (connectionWindowSize > 0 && ctx.channel().isActive()) { if (connectionWindowSize > 0 && ctx.channel().isActive()) {
Http2Stream stream = connection().connectionStream(); Http2Stream connectionStream = connection().connectionStream();
int currentSize = decoder().flowController().windowSize(stream); int currentSize = connectionStream.localFlowState().windowSize();
int delta = connectionWindowSize - currentSize; int delta = connectionWindowSize - currentSize;
decoder().flowController().incrementWindowSize(ctx, stream, delta); decoder().flowController().incrementWindowSize(ctx, connectionStream, delta);
connectionWindowSize = -1; connectionWindowSize = -1;
} }
} }

View File

@ -136,7 +136,7 @@ class NettyServerHandler extends Http2ConnectionHandler {
try { try {
// The Http2Stream object was put by AbstractHttp2ConnectionHandler before calling this // The Http2Stream object was put by AbstractHttp2ConnectionHandler before calling this
// method. // method.
Http2Stream http2Stream = connection().requireStream(streamId); Http2Stream http2Stream = requireHttp2Stream(streamId);
NettyServerStream stream = new NettyServerStream(ctx.channel(), http2Stream, this); NettyServerStream stream = new NettyServerStream(ctx.channel(), http2Stream, this);
http2Stream.setProperty(NettyServerStream.class, stream); http2Stream.setProperty(NettyServerStream.class, stream);
String method = determineMethod(streamId, headers); String method = determineMethod(streamId, headers);
@ -153,10 +153,8 @@ class NettyServerHandler extends Http2ConnectionHandler {
private void onDataRead(int streamId, ByteBuf data, boolean endOfStream) throws Http2Exception { private void onDataRead(int streamId, ByteBuf data, boolean endOfStream) throws Http2Exception {
try { try {
NettyServerStream stream = serverStream(connection().requireStream(streamId)); NettyServerStream stream = serverStream(requireHttp2Stream(streamId));
stream.inboundDataReceived(data, endOfStream); stream.inboundDataReceived(data, endOfStream);
} catch (Http2Exception e) {
throw e;
} catch (Throwable e) { } catch (Throwable e) {
logger.log(Level.WARNING, "Exception in onDataRead()", e); logger.log(Level.WARNING, "Exception in onDataRead()", e);
throw newStreamException(streamId, e); throw newStreamException(streamId, e);
@ -165,10 +163,8 @@ class NettyServerHandler extends Http2ConnectionHandler {
private void onRstStreamRead(int streamId) throws Http2Exception { private void onRstStreamRead(int streamId) throws Http2Exception {
try { try {
NettyServerStream stream = serverStream(connection().requireStream(streamId)); NettyServerStream stream = serverStream(requireHttp2Stream(streamId));
stream.abortStream(Status.CANCELLED, false); stream.abortStream(Status.CANCELLED, false);
} catch (Http2Exception e) {
throw e;
} catch (Throwable e) { } catch (Throwable e) {
logger.log(Level.WARNING, "Exception in onRstStreamRead()", e); logger.log(Level.WARNING, "Exception in onRstStreamRead()", e);
throw newStreamException(streamId, e); throw newStreamException(streamId, e);
@ -249,7 +245,7 @@ class NettyServerHandler extends Http2ConnectionHandler {
} }
private void closeStreamWhenDone(ChannelPromise promise, int streamId) throws Http2Exception { private void closeStreamWhenDone(ChannelPromise promise, int streamId) throws Http2Exception {
final NettyServerStream stream = serverStream(connection().requireStream(streamId)); final NettyServerStream stream = serverStream(requireHttp2Stream(streamId));
promise.addListener(new ChannelFutureListener() { promise.addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) { public void operationComplete(ChannelFuture future) {
@ -307,6 +303,15 @@ class NettyServerHandler extends Http2ConnectionHandler {
}); });
} }
private Http2Stream requireHttp2Stream(int streamId) {
Http2Stream stream = connection().stream(streamId);
if (stream == null) {
// This should never happen.
throw new AssertionError("Stream does not exist: " + streamId);
}
return stream;
}
private String determineMethod(int streamId, Http2Headers headers) throws Http2Exception { private String determineMethod(int streamId, Http2Headers headers) throws Http2Exception {
if (!HTTP_METHOD.equals(headers.method())) { if (!HTTP_METHOD.equals(headers.method())) {
throw Http2Exception.streamError(streamId, Http2Error.REFUSED_STREAM, throw Http2Exception.streamError(streamId, Http2Error.REFUSED_STREAM,

View File

@ -357,8 +357,8 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase {
int connectionWindow = 1048576; // 1MiB int connectionWindow = 1048576; // 1MiB
handler = newHandler(connectionWindow, DEFAULT_WINDOW_SIZE); handler = newHandler(connectionWindow, DEFAULT_WINDOW_SIZE);
handler.handlerAdded(ctx); handler.handlerAdded(ctx);
assertEquals(connectionWindow, assertEquals(connectionWindow, handler.connection().connectionStream().localFlowState()
handler.decoder().flowController().windowSize(handler.connection().connectionStream())); .windowSize());
} }
@Test @Test