From 7f73189e56b575ef6e57fda0da1eb46f1cd4ec8c Mon Sep 17 00:00:00 2001 From: nmittler Date: Tue, 5 May 2015 16:22:24 -0700 Subject: [PATCH] Always checking MAX_CONCURRENT_STREAMS in BufferingHttp2ConnectionEncoder. Currently we don't check this setting when handling a streamClosed() event. If the setting has lowered prior to this event, the stream creation could fail. --- .../BufferingHttp2ConnectionEncoder.java | 17 +++++--------- .../BufferingHttp2ConnectionEncoderTest.java | 22 +++++++++++++++++++ 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/netty/src/main/java/io/grpc/transport/netty/BufferingHttp2ConnectionEncoder.java b/netty/src/main/java/io/grpc/transport/netty/BufferingHttp2ConnectionEncoder.java index 972b9f8782..6372b8c948 100644 --- a/netty/src/main/java/io/grpc/transport/netty/BufferingHttp2ConnectionEncoder.java +++ b/netty/src/main/java/io/grpc/transport/netty/BufferingHttp2ConnectionEncoder.java @@ -87,7 +87,7 @@ class BufferingHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder { @Override public void onStreamClosed(Http2Stream stream) { - createNextPendingStream(); + tryCreatePendingStreams(); } }); } @@ -178,20 +178,13 @@ class BufferingHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder { private void tryCreatePendingStreams() { while (!pendingStreams.isEmpty() && connection().local().canCreateStream()) { - createNextPendingStream(); + Map.Entry entry = pendingStreams.pollFirstEntry(); + PendingStream pendingStream = entry.getValue(); + pendingStream.sendFrames(); + largestCreatedStreamId = pendingStream.streamId; } } - private void createNextPendingStream() { - Map.Entry entry = pendingStreams.pollFirstEntry(); - if (entry == null) { - return; - } - PendingStream pendingStream = entry.getValue(); - pendingStream.sendFrames(); - largestCreatedStreamId = pendingStream.streamId; - } - private void cancelPendingStreams() { Exception e = new Exception("Connection closed."); while (!pendingStreams.isEmpty()) { diff --git a/netty/src/test/java/io/grpc/transport/netty/BufferingHttp2ConnectionEncoderTest.java b/netty/src/test/java/io/grpc/transport/netty/BufferingHttp2ConnectionEncoderTest.java index fd42b0a291..d19950944d 100644 --- a/netty/src/test/java/io/grpc/transport/netty/BufferingHttp2ConnectionEncoderTest.java +++ b/netty/src/test/java/io/grpc/transport/netty/BufferingHttp2ConnectionEncoderTest.java @@ -143,6 +143,28 @@ public class BufferingHttp2ConnectionEncoderTest { .writeData(eq(ctx), eq(3), any(ByteBuf.class), eq(0), eq(false), eq(promise)); } + @Test + public void ensureCanCreateNextStreamWhenStreamCloses() { + connection.local().maxActiveStreams(1); + + encoderWriteHeaders(3, promise); + // This one gets buffered. + encoderWriteHeaders(5, promise); + assertEquals(1, connection.numActiveStreams()); + + // Now prevent us from creating another stream. + connection.local().maxActiveStreams(0); + + // Close the previous stream. + connection.stream(3).close(); + + // Ensure that no streams are currently active and that only the HEADERS from the first + // stream were written. + writeVerifyWriteHeaders(times(1), 3, promise); + writeVerifyWriteHeaders(never(), 5, promise); + assertEquals(0, connection.numActiveStreams()); + } + @Test public void alternatingWritesToActiveAndBufferedStreams() { connection.local().maxActiveStreams(1);