From d5727c7fcd68615f7f092a7a690e23038ef05fb8 Mon Sep 17 00:00:00 2001 From: nmittler Date: Tue, 12 May 2015 14:02:29 -0700 Subject: [PATCH] Deferring stream creation until receiving SETTINGS from server. Additionally: - Fixed bug where the decoder was given the incorrect encoder. - Adding proper logging class for client/server. --- .../BufferingHttp2ConnectionEncoder.java | 23 +++++++-- .../transport/netty/NettyClientHandler.java | 6 +-- .../transport/netty/NettyClientTransport.java | 7 ++- .../transport/netty/NettyServerTransport.java | 2 +- .../BufferingHttp2ConnectionEncoderTest.java | 49 ++++++++++++++++++- .../netty/NettyClientHandlerTest.java | 4 +- 6 files changed, 77 insertions(+), 14 deletions(-) diff --git a/netty/src/main/java/io/grpc/transport/netty/BufferingHttp2ConnectionEncoder.java b/netty/src/main/java/io/grpc/transport/netty/BufferingHttp2ConnectionEncoder.java index 9592c69392..5ac8c83c2b 100644 --- a/netty/src/main/java/io/grpc/transport/netty/BufferingHttp2ConnectionEncoder.java +++ b/netty/src/main/java/io/grpc/transport/netty/BufferingHttp2ConnectionEncoder.java @@ -40,6 +40,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http2.DecoratingHttp2ConnectionEncoder; import io.netty.handler.codec.http2.Http2CodecUtil; +import io.netty.handler.codec.http2.Http2Connection; import io.netty.handler.codec.http2.Http2ConnectionAdapter; import io.netty.handler.codec.http2.Http2ConnectionEncoder; import io.netty.handler.codec.http2.Http2Headers; @@ -66,6 +67,11 @@ import java.util.TreeMap; * in replacement for {@link io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder}. */ class BufferingHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder { + /** + * The number of new streams we allow to be created before receiving the first {@code SETTINGS} + * frame from the server. + */ + private static final int NUM_STREAMS_INITIALLY_ALLOWED = 10; /** * Buffer for any streams and corresponding frames that could not be created @@ -75,6 +81,7 @@ class BufferingHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder { new TreeMap(); // Smallest stream id whose corresponding frames do not get buffered. private int largestCreatedStreamId; + private boolean receivedSettings; protected BufferingHttp2ConnectionEncoder(Http2ConnectionEncoder delegate) { super(delegate); @@ -103,7 +110,7 @@ class BufferingHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder { public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endStream, ChannelPromise promise) { return writeHeaders(ctx, streamId, headers, 0, Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT, false, - padding, endStream, promise); + padding, endStream, promise); } @Override @@ -114,7 +121,7 @@ class BufferingHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder { return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endOfStream, promise); } - if (connection().local().canCreateStream()) { + if (canCreateStream()) { assert streamId > largestCreatedStreamId; largestCreatedStreamId = streamId; return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, @@ -170,6 +177,7 @@ class BufferingHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder { @Override public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) { + receivedSettings = true; ChannelFuture future = super.writeSettingsAck(ctx, promise); // After having received a SETTINGS frame, the maximum number of concurrent streams // might have changed. So try to create some buffered streams. @@ -184,7 +192,7 @@ class BufferingHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder { } private void tryCreatePendingStreams() { - while (!pendingStreams.isEmpty() && connection().local().canCreateStream()) { + while (!pendingStreams.isEmpty() && canCreateStream()) { Map.Entry entry = pendingStreams.pollFirstEntry(); PendingStream pendingStream = entry.getValue(); pendingStream.sendFrames(); @@ -212,6 +220,15 @@ class BufferingHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder { } } + /** + * Determines whether or not we're allowed to create a new stream right now. + */ + private boolean canCreateStream() { + Http2Connection.Endpoint local = connection().local(); + return (receivedSettings || local.numActiveStreams() < NUM_STREAMS_INITIALLY_ALLOWED) + && local.canCreateStream(); + } + private boolean existingStream(int streamId) { return streamId <= largestCreatedStreamId; } diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/transport/netty/NettyClientHandler.java index a3293443c0..623e3d986b 100644 --- a/netty/src/main/java/io/grpc/transport/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/transport/netty/NettyClientHandler.java @@ -38,6 +38,7 @@ import com.google.common.base.Preconditions; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.transport.HttpUtil; + import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -46,7 +47,6 @@ import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder; import io.netty.handler.codec.http2.Http2Connection; import io.netty.handler.codec.http2.Http2ConnectionAdapter; -import io.netty.handler.codec.http2.Http2ConnectionEncoder; import io.netty.handler.codec.http2.Http2ConnectionHandler; import io.netty.handler.codec.http2.Http2Error; import io.netty.handler.codec.http2.Http2Exception; @@ -75,11 +75,11 @@ class NettyClientHandler extends Http2ConnectionHandler { private ChannelHandlerContext ctx; private int nextStreamId; - public NettyClientHandler(Http2ConnectionEncoder encoder, Http2Connection connection, + public NettyClientHandler(BufferingHttp2ConnectionEncoder encoder, Http2Connection connection, Http2FrameReader frameReader, int connectionWindowSize, int streamWindowSize) { super(new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader, - new LazyFrameListener()), new BufferingHttp2ConnectionEncoder(encoder)); + new LazyFrameListener()), encoder); Preconditions.checkArgument(connectionWindowSize > 0, "connectionWindowSize must be positive"); this.connectionWindowSize = connectionWindowSize; try { diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java index 5c7d620915..86bd7b43e4 100644 --- a/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java @@ -258,13 +258,12 @@ class NettyClientTransport implements ClientTransport { Http2FrameReader frameReader = new DefaultHttp2FrameReader(); Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter(); - Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG); + Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, getClass()); frameReader = new Http2InboundFrameLogger(frameReader, frameLogger); frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger); - DefaultHttp2ConnectionEncoder encoder = - new DefaultHttp2ConnectionEncoder(connection, frameWriter); - + BufferingHttp2ConnectionEncoder encoder = new BufferingHttp2ConnectionEncoder( + new DefaultHttp2ConnectionEncoder(connection, frameWriter)); return new NettyClientHandler(encoder, connection, frameReader, connectionWindowSize, streamWindowSize); } diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/transport/netty/NettyServerTransport.java index af85a00780..239f0346c1 100644 --- a/netty/src/main/java/io/grpc/transport/netty/NettyServerTransport.java +++ b/netty/src/main/java/io/grpc/transport/netty/NettyServerTransport.java @@ -129,7 +129,7 @@ class NettyServerTransport implements ServerTransport { */ private NettyServerHandler createHandler(ServerTransportListener transportListener) { Http2Connection connection = new DefaultHttp2Connection(true); - Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG); + Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, getClass()); Http2FrameReader frameReader = new Http2InboundFrameLogger(new DefaultHttp2FrameReader(), frameLogger); Http2FrameWriter frameWriter = diff --git a/netty/src/test/java/io/grpc/transport/netty/BufferingHttp2ConnectionEncoderTest.java b/netty/src/test/java/io/grpc/transport/netty/BufferingHttp2ConnectionEncoderTest.java index a8e6bee5f9..6d9b9ef1ae 100644 --- a/netty/src/test/java/io/grpc/transport/netty/BufferingHttp2ConnectionEncoderTest.java +++ b/netty/src/test/java/io/grpc/transport/netty/BufferingHttp2ConnectionEncoderTest.java @@ -111,7 +111,7 @@ public class BufferingHttp2ConnectionEncoderTest { when(configuration.frameSizePolicy()).thenReturn(frameSizePolicy); when(frameSizePolicy.maxFrameSize()).thenReturn(DEFAULT_MAX_FRAME_SIZE); when(writer.writeRstStream(eq(ctx), anyInt(), anyLong(), eq(promise))).thenAnswer( - successAnswer()); + successAnswer()); when(writer.writeGoAway(eq(ctx), anyInt(), anyLong(), any(ByteBuf.class), eq(promise))) .thenAnswer(successAnswer()); @@ -132,6 +132,7 @@ public class BufferingHttp2ConnectionEncoderTest { @Test public void multipleWritesToActiveStream() { + encoder.writeSettingsAck(ctx, promise); encoderWriteHeaders(3, promise); assertEquals(0, encoder.numBufferedStreams()); encoder.writeData(ctx, 3, data(), 0, false, promise); @@ -146,6 +147,7 @@ public class BufferingHttp2ConnectionEncoderTest { @Test public void ensureCanCreateNextStreamWhenStreamCloses() { + encoder.writeSettingsAck(ctx, promise); connection.local().maxActiveStreams(1); encoderWriteHeaders(3, promise); @@ -172,6 +174,7 @@ public class BufferingHttp2ConnectionEncoderTest { @Test public void alternatingWritesToActiveAndBufferedStreams() { + encoder.writeSettingsAck(ctx, promise); connection.local().maxActiveStreams(1); encoderWriteHeaders(3, promise); @@ -190,6 +193,7 @@ public class BufferingHttp2ConnectionEncoderTest { @Test public void bufferingNewStreamFailsAfterGoAwayReceived() { + encoder.writeSettingsAck(ctx, promise); connection.local().maxActiveStreams(0); connection.goAwayReceived(1, 8, null); @@ -201,6 +205,7 @@ public class BufferingHttp2ConnectionEncoderTest { @Test public void receivingGoAwayFailsBufferedStreams() { + encoder.writeSettingsAck(ctx, promise); connection.local().maxActiveStreams(5); int streamId = 3; @@ -220,6 +225,7 @@ public class BufferingHttp2ConnectionEncoderTest { @Test public void sendingGoAwayShouldNotFailStreams() { + encoder.writeSettingsAck(ctx, promise); connection.local().maxActiveStreams(1); encoderWriteHeaders(3, promise); @@ -239,6 +245,7 @@ public class BufferingHttp2ConnectionEncoderTest { @Test public void endStreamDoesNotFailBufferedStream() { + encoder.writeSettingsAck(ctx, promise); connection.local().maxActiveStreams(0); encoderWriteHeaders(3, promise); @@ -262,6 +269,7 @@ public class BufferingHttp2ConnectionEncoderTest { @Test public void rstStreamClosesBufferedStream() { + encoder.writeSettingsAck(ctx, promise); connection.local().maxActiveStreams(0); encoderWriteHeaders(3, promise); @@ -277,6 +285,7 @@ public class BufferingHttp2ConnectionEncoderTest { @Test public void bufferUntilActiveStreamsAreReset() { + encoder.writeSettingsAck(ctx, promise); connection.local().maxActiveStreams(1); encoderWriteHeaders(3, promise); @@ -303,6 +312,7 @@ public class BufferingHttp2ConnectionEncoderTest { @Test public void bufferUntilMaxStreamsIncreased() { + encoder.writeSettingsAck(ctx, promise); connection.local().maxActiveStreams(2); encoderWriteHeaders(3, promise); @@ -332,8 +342,45 @@ public class BufferingHttp2ConnectionEncoderTest { assertEquals(5, connection.local().numActiveStreams()); } + @Test + public void bufferUntilSettingsReceived() { + encoderWriteHeaders(3, promise); + encoderWriteHeaders(5, promise); + encoderWriteHeaders(7, promise); + encoderWriteHeaders(9, promise); + encoderWriteHeaders(11, promise); + encoderWriteHeaders(13, promise); + encoderWriteHeaders(15, promise); + encoderWriteHeaders(17, promise); + encoderWriteHeaders(19, promise); + encoderWriteHeaders(21, promise); + encoderWriteHeaders(23, promise); + assertEquals(1, encoder.numBufferedStreams()); + + writeVerifyWriteHeaders(times(1), 3, promise); + writeVerifyWriteHeaders(times(1), 5, promise); + writeVerifyWriteHeaders(times(1), 7, promise); + writeVerifyWriteHeaders(times(1), 9, promise); + writeVerifyWriteHeaders(times(1), 11, promise); + writeVerifyWriteHeaders(times(1), 13, promise); + writeVerifyWriteHeaders(times(1), 15, promise); + writeVerifyWriteHeaders(times(1), 17, promise); + writeVerifyWriteHeaders(times(1), 19, promise); + writeVerifyWriteHeaders(times(1), 21, promise); + writeVerifyWriteHeaders(never(), 23, promise); + + // Simulate that we received a SETTINGS frame. + encoder.writeSettingsAck(ctx, promise); + + assertEquals(0, encoder.numBufferedStreams()); + writeVerifyWriteHeaders(times(1), 23, promise); + + assertEquals(11, connection.local().numActiveStreams()); + } + @Test public void closedBufferedStreamReleasesByteBuf() { + encoder.writeSettingsAck(ctx, promise); connection.local().maxActiveStreams(0); ByteBuf data = mock(ByteBuf.class); encoderWriteHeaders(3, promise); diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/transport/netty/NettyClientHandlerTest.java index e6cdc45d7b..3e2325aaa2 100644 --- a/netty/src/test/java/io/grpc/transport/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/transport/netty/NettyClientHandlerTest.java @@ -401,8 +401,8 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase { Http2Connection connection = new DefaultHttp2Connection(false); Http2FrameReader frameReader = new DefaultHttp2FrameReader(); Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter(); - DefaultHttp2ConnectionEncoder encoder = - new DefaultHttp2ConnectionEncoder(connection, frameWriter); + BufferingHttp2ConnectionEncoder encoder = new BufferingHttp2ConnectionEncoder( + new DefaultHttp2ConnectionEncoder(connection, frameWriter)); return new NettyClientHandler(encoder, connection, frameReader, connectionWindowSize, streamWindowSize); }