Deferring stream creation until receiving SETTINGS from server.

Additionally:
- Fixed bug where the decoder was given the incorrect encoder.
- Adding proper logging class for client/server.
This commit is contained in:
nmittler 2015-05-12 14:02:29 -07:00
parent 2cdc5e3c47
commit d5727c7fcd
6 changed files with 77 additions and 14 deletions

View File

@ -40,6 +40,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.DecoratingHttp2ConnectionEncoder; import io.netty.handler.codec.http2.DecoratingHttp2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2CodecUtil; import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionAdapter; import io.netty.handler.codec.http2.Http2ConnectionAdapter;
import io.netty.handler.codec.http2.Http2ConnectionEncoder; import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Headers;
@ -66,6 +67,11 @@ import java.util.TreeMap;
* in replacement for {@link io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder}. * in replacement for {@link io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder}.
*/ */
class BufferingHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder { class BufferingHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder {
/**
* The number of new streams we allow to be created before receiving the first {@code SETTINGS}
* frame from the server.
*/
private static final int NUM_STREAMS_INITIALLY_ALLOWED = 10;
/** /**
* Buffer for any streams and corresponding frames that could not be created * Buffer for any streams and corresponding frames that could not be created
@ -75,6 +81,7 @@ class BufferingHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder {
new TreeMap<Integer, PendingStream>(); new TreeMap<Integer, PendingStream>();
// Smallest stream id whose corresponding frames do not get buffered. // Smallest stream id whose corresponding frames do not get buffered.
private int largestCreatedStreamId; private int largestCreatedStreamId;
private boolean receivedSettings;
protected BufferingHttp2ConnectionEncoder(Http2ConnectionEncoder delegate) { protected BufferingHttp2ConnectionEncoder(Http2ConnectionEncoder delegate) {
super(delegate); super(delegate);
@ -103,7 +110,7 @@ class BufferingHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder {
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int padding, boolean endStream, ChannelPromise promise) { int padding, boolean endStream, ChannelPromise promise) {
return writeHeaders(ctx, streamId, headers, 0, Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT, false, return writeHeaders(ctx, streamId, headers, 0, Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT, false,
padding, endStream, promise); padding, endStream, promise);
} }
@Override @Override
@ -114,7 +121,7 @@ class BufferingHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder {
return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, return super.writeHeaders(ctx, streamId, headers, streamDependency, weight,
exclusive, padding, endOfStream, promise); exclusive, padding, endOfStream, promise);
} }
if (connection().local().canCreateStream()) { if (canCreateStream()) {
assert streamId > largestCreatedStreamId; assert streamId > largestCreatedStreamId;
largestCreatedStreamId = streamId; largestCreatedStreamId = streamId;
return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, return super.writeHeaders(ctx, streamId, headers, streamDependency, weight,
@ -170,6 +177,7 @@ class BufferingHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder {
@Override @Override
public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) { public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) {
receivedSettings = true;
ChannelFuture future = super.writeSettingsAck(ctx, promise); ChannelFuture future = super.writeSettingsAck(ctx, promise);
// After having received a SETTINGS frame, the maximum number of concurrent streams // After having received a SETTINGS frame, the maximum number of concurrent streams
// might have changed. So try to create some buffered streams. // might have changed. So try to create some buffered streams.
@ -184,7 +192,7 @@ class BufferingHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder {
} }
private void tryCreatePendingStreams() { private void tryCreatePendingStreams() {
while (!pendingStreams.isEmpty() && connection().local().canCreateStream()) { while (!pendingStreams.isEmpty() && canCreateStream()) {
Map.Entry<Integer, PendingStream> entry = pendingStreams.pollFirstEntry(); Map.Entry<Integer, PendingStream> entry = pendingStreams.pollFirstEntry();
PendingStream pendingStream = entry.getValue(); PendingStream pendingStream = entry.getValue();
pendingStream.sendFrames(); pendingStream.sendFrames();
@ -212,6 +220,15 @@ class BufferingHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder {
} }
} }
/**
* Determines whether or not we're allowed to create a new stream right now.
*/
private boolean canCreateStream() {
Http2Connection.Endpoint local = connection().local();
return (receivedSettings || local.numActiveStreams() < NUM_STREAMS_INITIALLY_ALLOWED)
&& local.canCreateStream();
}
private boolean existingStream(int streamId) { private boolean existingStream(int streamId) {
return streamId <= largestCreatedStreamId; return streamId <= largestCreatedStreamId;
} }

View File

@ -38,6 +38,7 @@ import com.google.common.base.Preconditions;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.transport.HttpUtil; import io.grpc.transport.HttpUtil;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
@ -46,7 +47,6 @@ import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder; import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
import io.netty.handler.codec.http2.Http2Connection; import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionAdapter; import io.netty.handler.codec.http2.Http2ConnectionAdapter;
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2ConnectionHandler; import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2Error; import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception; import io.netty.handler.codec.http2.Http2Exception;
@ -75,11 +75,11 @@ class NettyClientHandler extends Http2ConnectionHandler {
private ChannelHandlerContext ctx; private ChannelHandlerContext ctx;
private int nextStreamId; private int nextStreamId;
public NettyClientHandler(Http2ConnectionEncoder encoder, Http2Connection connection, public NettyClientHandler(BufferingHttp2ConnectionEncoder encoder, Http2Connection connection,
Http2FrameReader frameReader, Http2FrameReader frameReader,
int connectionWindowSize, int streamWindowSize) { int connectionWindowSize, int streamWindowSize) {
super(new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader, super(new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader,
new LazyFrameListener()), new BufferingHttp2ConnectionEncoder(encoder)); new LazyFrameListener()), encoder);
Preconditions.checkArgument(connectionWindowSize > 0, "connectionWindowSize must be positive"); Preconditions.checkArgument(connectionWindowSize > 0, "connectionWindowSize must be positive");
this.connectionWindowSize = connectionWindowSize; this.connectionWindowSize = connectionWindowSize;
try { try {

View File

@ -258,13 +258,12 @@ class NettyClientTransport implements ClientTransport {
Http2FrameReader frameReader = new DefaultHttp2FrameReader(); Http2FrameReader frameReader = new DefaultHttp2FrameReader();
Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter(); Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter();
Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG); Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, getClass());
frameReader = new Http2InboundFrameLogger(frameReader, frameLogger); frameReader = new Http2InboundFrameLogger(frameReader, frameLogger);
frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger); frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger);
DefaultHttp2ConnectionEncoder encoder = BufferingHttp2ConnectionEncoder encoder = new BufferingHttp2ConnectionEncoder(
new DefaultHttp2ConnectionEncoder(connection, frameWriter); new DefaultHttp2ConnectionEncoder(connection, frameWriter));
return new NettyClientHandler(encoder, connection, frameReader, connectionWindowSize, return new NettyClientHandler(encoder, connection, frameReader, connectionWindowSize,
streamWindowSize); streamWindowSize);
} }

View File

@ -129,7 +129,7 @@ class NettyServerTransport implements ServerTransport {
*/ */
private NettyServerHandler createHandler(ServerTransportListener transportListener) { private NettyServerHandler createHandler(ServerTransportListener transportListener) {
Http2Connection connection = new DefaultHttp2Connection(true); Http2Connection connection = new DefaultHttp2Connection(true);
Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG); Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, getClass());
Http2FrameReader frameReader = Http2FrameReader frameReader =
new Http2InboundFrameLogger(new DefaultHttp2FrameReader(), frameLogger); new Http2InboundFrameLogger(new DefaultHttp2FrameReader(), frameLogger);
Http2FrameWriter frameWriter = Http2FrameWriter frameWriter =

View File

@ -111,7 +111,7 @@ public class BufferingHttp2ConnectionEncoderTest {
when(configuration.frameSizePolicy()).thenReturn(frameSizePolicy); when(configuration.frameSizePolicy()).thenReturn(frameSizePolicy);
when(frameSizePolicy.maxFrameSize()).thenReturn(DEFAULT_MAX_FRAME_SIZE); when(frameSizePolicy.maxFrameSize()).thenReturn(DEFAULT_MAX_FRAME_SIZE);
when(writer.writeRstStream(eq(ctx), anyInt(), anyLong(), eq(promise))).thenAnswer( when(writer.writeRstStream(eq(ctx), anyInt(), anyLong(), eq(promise))).thenAnswer(
successAnswer()); successAnswer());
when(writer.writeGoAway(eq(ctx), anyInt(), anyLong(), any(ByteBuf.class), eq(promise))) when(writer.writeGoAway(eq(ctx), anyInt(), anyLong(), any(ByteBuf.class), eq(promise)))
.thenAnswer(successAnswer()); .thenAnswer(successAnswer());
@ -132,6 +132,7 @@ public class BufferingHttp2ConnectionEncoderTest {
@Test @Test
public void multipleWritesToActiveStream() { public void multipleWritesToActiveStream() {
encoder.writeSettingsAck(ctx, promise);
encoderWriteHeaders(3, promise); encoderWriteHeaders(3, promise);
assertEquals(0, encoder.numBufferedStreams()); assertEquals(0, encoder.numBufferedStreams());
encoder.writeData(ctx, 3, data(), 0, false, promise); encoder.writeData(ctx, 3, data(), 0, false, promise);
@ -146,6 +147,7 @@ public class BufferingHttp2ConnectionEncoderTest {
@Test @Test
public void ensureCanCreateNextStreamWhenStreamCloses() { public void ensureCanCreateNextStreamWhenStreamCloses() {
encoder.writeSettingsAck(ctx, promise);
connection.local().maxActiveStreams(1); connection.local().maxActiveStreams(1);
encoderWriteHeaders(3, promise); encoderWriteHeaders(3, promise);
@ -172,6 +174,7 @@ public class BufferingHttp2ConnectionEncoderTest {
@Test @Test
public void alternatingWritesToActiveAndBufferedStreams() { public void alternatingWritesToActiveAndBufferedStreams() {
encoder.writeSettingsAck(ctx, promise);
connection.local().maxActiveStreams(1); connection.local().maxActiveStreams(1);
encoderWriteHeaders(3, promise); encoderWriteHeaders(3, promise);
@ -190,6 +193,7 @@ public class BufferingHttp2ConnectionEncoderTest {
@Test @Test
public void bufferingNewStreamFailsAfterGoAwayReceived() { public void bufferingNewStreamFailsAfterGoAwayReceived() {
encoder.writeSettingsAck(ctx, promise);
connection.local().maxActiveStreams(0); connection.local().maxActiveStreams(0);
connection.goAwayReceived(1, 8, null); connection.goAwayReceived(1, 8, null);
@ -201,6 +205,7 @@ public class BufferingHttp2ConnectionEncoderTest {
@Test @Test
public void receivingGoAwayFailsBufferedStreams() { public void receivingGoAwayFailsBufferedStreams() {
encoder.writeSettingsAck(ctx, promise);
connection.local().maxActiveStreams(5); connection.local().maxActiveStreams(5);
int streamId = 3; int streamId = 3;
@ -220,6 +225,7 @@ public class BufferingHttp2ConnectionEncoderTest {
@Test @Test
public void sendingGoAwayShouldNotFailStreams() { public void sendingGoAwayShouldNotFailStreams() {
encoder.writeSettingsAck(ctx, promise);
connection.local().maxActiveStreams(1); connection.local().maxActiveStreams(1);
encoderWriteHeaders(3, promise); encoderWriteHeaders(3, promise);
@ -239,6 +245,7 @@ public class BufferingHttp2ConnectionEncoderTest {
@Test @Test
public void endStreamDoesNotFailBufferedStream() { public void endStreamDoesNotFailBufferedStream() {
encoder.writeSettingsAck(ctx, promise);
connection.local().maxActiveStreams(0); connection.local().maxActiveStreams(0);
encoderWriteHeaders(3, promise); encoderWriteHeaders(3, promise);
@ -262,6 +269,7 @@ public class BufferingHttp2ConnectionEncoderTest {
@Test @Test
public void rstStreamClosesBufferedStream() { public void rstStreamClosesBufferedStream() {
encoder.writeSettingsAck(ctx, promise);
connection.local().maxActiveStreams(0); connection.local().maxActiveStreams(0);
encoderWriteHeaders(3, promise); encoderWriteHeaders(3, promise);
@ -277,6 +285,7 @@ public class BufferingHttp2ConnectionEncoderTest {
@Test @Test
public void bufferUntilActiveStreamsAreReset() { public void bufferUntilActiveStreamsAreReset() {
encoder.writeSettingsAck(ctx, promise);
connection.local().maxActiveStreams(1); connection.local().maxActiveStreams(1);
encoderWriteHeaders(3, promise); encoderWriteHeaders(3, promise);
@ -303,6 +312,7 @@ public class BufferingHttp2ConnectionEncoderTest {
@Test @Test
public void bufferUntilMaxStreamsIncreased() { public void bufferUntilMaxStreamsIncreased() {
encoder.writeSettingsAck(ctx, promise);
connection.local().maxActiveStreams(2); connection.local().maxActiveStreams(2);
encoderWriteHeaders(3, promise); encoderWriteHeaders(3, promise);
@ -332,8 +342,45 @@ public class BufferingHttp2ConnectionEncoderTest {
assertEquals(5, connection.local().numActiveStreams()); assertEquals(5, connection.local().numActiveStreams());
} }
@Test
public void bufferUntilSettingsReceived() {
encoderWriteHeaders(3, promise);
encoderWriteHeaders(5, promise);
encoderWriteHeaders(7, promise);
encoderWriteHeaders(9, promise);
encoderWriteHeaders(11, promise);
encoderWriteHeaders(13, promise);
encoderWriteHeaders(15, promise);
encoderWriteHeaders(17, promise);
encoderWriteHeaders(19, promise);
encoderWriteHeaders(21, promise);
encoderWriteHeaders(23, promise);
assertEquals(1, encoder.numBufferedStreams());
writeVerifyWriteHeaders(times(1), 3, promise);
writeVerifyWriteHeaders(times(1), 5, promise);
writeVerifyWriteHeaders(times(1), 7, promise);
writeVerifyWriteHeaders(times(1), 9, promise);
writeVerifyWriteHeaders(times(1), 11, promise);
writeVerifyWriteHeaders(times(1), 13, promise);
writeVerifyWriteHeaders(times(1), 15, promise);
writeVerifyWriteHeaders(times(1), 17, promise);
writeVerifyWriteHeaders(times(1), 19, promise);
writeVerifyWriteHeaders(times(1), 21, promise);
writeVerifyWriteHeaders(never(), 23, promise);
// Simulate that we received a SETTINGS frame.
encoder.writeSettingsAck(ctx, promise);
assertEquals(0, encoder.numBufferedStreams());
writeVerifyWriteHeaders(times(1), 23, promise);
assertEquals(11, connection.local().numActiveStreams());
}
@Test @Test
public void closedBufferedStreamReleasesByteBuf() { public void closedBufferedStreamReleasesByteBuf() {
encoder.writeSettingsAck(ctx, promise);
connection.local().maxActiveStreams(0); connection.local().maxActiveStreams(0);
ByteBuf data = mock(ByteBuf.class); ByteBuf data = mock(ByteBuf.class);
encoderWriteHeaders(3, promise); encoderWriteHeaders(3, promise);

View File

@ -401,8 +401,8 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase {
Http2Connection connection = new DefaultHttp2Connection(false); Http2Connection connection = new DefaultHttp2Connection(false);
Http2FrameReader frameReader = new DefaultHttp2FrameReader(); Http2FrameReader frameReader = new DefaultHttp2FrameReader();
Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter(); Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter();
DefaultHttp2ConnectionEncoder encoder = BufferingHttp2ConnectionEncoder encoder = new BufferingHttp2ConnectionEncoder(
new DefaultHttp2ConnectionEncoder(connection, frameWriter); new DefaultHttp2ConnectionEncoder(connection, frameWriter));
return new NettyClientHandler(encoder, connection, frameReader, connectionWindowSize, return new NettyClientHandler(encoder, connection, frameReader, connectionWindowSize,
streamWindowSize); streamWindowSize);
} }