diff --git a/netty/src/main/java/io/grpc/transport/netty/BufferingHttp2ConnectionEncoder.java b/netty/src/main/java/io/grpc/transport/netty/BufferingHttp2ConnectionEncoder.java index 13acdb7d75..3512cfa29e 100644 --- a/netty/src/main/java/io/grpc/transport/netty/BufferingHttp2ConnectionEncoder.java +++ b/netty/src/main/java/io/grpc/transport/netty/BufferingHttp2ConnectionEncoder.java @@ -68,10 +68,10 @@ import java.util.TreeMap; */ class BufferingHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder { /** - * The number of new streams we allow to be created before receiving the first {@code SETTINGS} - * frame from the server. + * The assumed minimum value for {@code SETTINGS_MAX_CONCURRENT_STREAMS} as + * recommended by the HTTP/2 spec. */ - private static final int NUM_STREAMS_INITIALLY_ALLOWED = 10; + static final int SMALLEST_MAX_CONCURRENT_STREAMS = 100; /** * Buffer for any streams and corresponding frames that could not be created @@ -79,12 +79,19 @@ class BufferingHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder { */ private final TreeMap pendingStreams = new TreeMap(); + private final int initialMaxConcurrentStreams; // Smallest stream id whose corresponding frames do not get buffered. private int largestCreatedStreamId; private boolean receivedSettings; protected BufferingHttp2ConnectionEncoder(Http2ConnectionEncoder delegate) { + this(delegate, SMALLEST_MAX_CONCURRENT_STREAMS); + } + + protected BufferingHttp2ConnectionEncoder(Http2ConnectionEncoder delegate, + int initialMaxConcurrentStreams) { super(delegate); + this.initialMaxConcurrentStreams = initialMaxConcurrentStreams; connection().addListener(new Http2ConnectionAdapter() { @Override @@ -225,7 +232,7 @@ class BufferingHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder { */ private boolean canCreateStream() { Http2Connection.Endpoint local = connection().local(); - return (receivedSettings || local.numActiveStreams() < NUM_STREAMS_INITIALLY_ALLOWED) + return (receivedSettings || local.numActiveStreams() < initialMaxConcurrentStreams) && local.canCreateStream(); } diff --git a/netty/src/test/java/io/grpc/transport/netty/BufferingHttp2ConnectionEncoderTest.java b/netty/src/test/java/io/grpc/transport/netty/BufferingHttp2ConnectionEncoderTest.java index 6d9b9ef1ae..0bea052f34 100644 --- a/netty/src/test/java/io/grpc/transport/netty/BufferingHttp2ConnectionEncoderTest.java +++ b/netty/src/test/java/io/grpc/transport/netty/BufferingHttp2ConnectionEncoderTest.java @@ -344,38 +344,23 @@ public class BufferingHttp2ConnectionEncoderTest { @Test public void bufferUntilSettingsReceived() { - encoderWriteHeaders(3, promise); - encoderWriteHeaders(5, promise); - encoderWriteHeaders(7, promise); - encoderWriteHeaders(9, promise); - encoderWriteHeaders(11, promise); - encoderWriteHeaders(13, promise); - encoderWriteHeaders(15, promise); - encoderWriteHeaders(17, promise); - encoderWriteHeaders(19, promise); - encoderWriteHeaders(21, promise); - encoderWriteHeaders(23, promise); - assertEquals(1, encoder.numBufferedStreams()); - - writeVerifyWriteHeaders(times(1), 3, promise); - writeVerifyWriteHeaders(times(1), 5, promise); - writeVerifyWriteHeaders(times(1), 7, promise); - writeVerifyWriteHeaders(times(1), 9, promise); - writeVerifyWriteHeaders(times(1), 11, promise); - writeVerifyWriteHeaders(times(1), 13, promise); - writeVerifyWriteHeaders(times(1), 15, promise); - writeVerifyWriteHeaders(times(1), 17, promise); - writeVerifyWriteHeaders(times(1), 19, promise); - writeVerifyWriteHeaders(times(1), 21, promise); - writeVerifyWriteHeaders(never(), 23, promise); + int initialLimit = BufferingHttp2ConnectionEncoder.SMALLEST_MAX_CONCURRENT_STREAMS; + int numStreams = initialLimit * 2; + for (int ix = 0, nextStreamId = 3; ix < numStreams; ++ix, nextStreamId += 2) { + encoderWriteHeaders(nextStreamId, promise); + if (ix < initialLimit) { + writeVerifyWriteHeaders(times(1), nextStreamId, promise); + } else { + writeVerifyWriteHeaders(never(), nextStreamId, promise); + } + } + assertEquals(numStreams / 2, encoder.numBufferedStreams()); // Simulate that we received a SETTINGS frame. encoder.writeSettingsAck(ctx, promise); assertEquals(0, encoder.numBufferedStreams()); - writeVerifyWriteHeaders(times(1), 23, promise); - - assertEquals(11, connection.local().numActiveStreams()); + assertEquals(numStreams, connection.local().numActiveStreams()); } @Test