mirror of https://github.com/grpc/grpc-java.git
Always checking MAX_CONCURRENT_STREAMS in
BufferingHttp2ConnectionEncoder. Currently we don't check this setting when handling a streamClosed() event. If the setting has lowered prior to this event, the stream creation could fail.
This commit is contained in:
parent
a45e0a4767
commit
7f73189e56
|
|
@ -87,7 +87,7 @@ class BufferingHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onStreamClosed(Http2Stream stream) {
|
public void onStreamClosed(Http2Stream stream) {
|
||||||
createNextPendingStream();
|
tryCreatePendingStreams();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
@ -178,19 +178,12 @@ class BufferingHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder {
|
||||||
|
|
||||||
private void tryCreatePendingStreams() {
|
private void tryCreatePendingStreams() {
|
||||||
while (!pendingStreams.isEmpty() && connection().local().canCreateStream()) {
|
while (!pendingStreams.isEmpty() && connection().local().canCreateStream()) {
|
||||||
createNextPendingStream();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void createNextPendingStream() {
|
|
||||||
Map.Entry<Integer, PendingStream> entry = pendingStreams.pollFirstEntry();
|
Map.Entry<Integer, PendingStream> entry = pendingStreams.pollFirstEntry();
|
||||||
if (entry == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
PendingStream pendingStream = entry.getValue();
|
PendingStream pendingStream = entry.getValue();
|
||||||
pendingStream.sendFrames();
|
pendingStream.sendFrames();
|
||||||
largestCreatedStreamId = pendingStream.streamId;
|
largestCreatedStreamId = pendingStream.streamId;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void cancelPendingStreams() {
|
private void cancelPendingStreams() {
|
||||||
Exception e = new Exception("Connection closed.");
|
Exception e = new Exception("Connection closed.");
|
||||||
|
|
|
||||||
|
|
@ -143,6 +143,28 @@ public class BufferingHttp2ConnectionEncoderTest {
|
||||||
.writeData(eq(ctx), eq(3), any(ByteBuf.class), eq(0), eq(false), eq(promise));
|
.writeData(eq(ctx), eq(3), any(ByteBuf.class), eq(0), eq(false), eq(promise));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void ensureCanCreateNextStreamWhenStreamCloses() {
|
||||||
|
connection.local().maxActiveStreams(1);
|
||||||
|
|
||||||
|
encoderWriteHeaders(3, promise);
|
||||||
|
// This one gets buffered.
|
||||||
|
encoderWriteHeaders(5, promise);
|
||||||
|
assertEquals(1, connection.numActiveStreams());
|
||||||
|
|
||||||
|
// Now prevent us from creating another stream.
|
||||||
|
connection.local().maxActiveStreams(0);
|
||||||
|
|
||||||
|
// Close the previous stream.
|
||||||
|
connection.stream(3).close();
|
||||||
|
|
||||||
|
// Ensure that no streams are currently active and that only the HEADERS from the first
|
||||||
|
// stream were written.
|
||||||
|
writeVerifyWriteHeaders(times(1), 3, promise);
|
||||||
|
writeVerifyWriteHeaders(never(), 5, promise);
|
||||||
|
assertEquals(0, connection.numActiveStreams());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void alternatingWritesToActiveAndBufferedStreams() {
|
public void alternatingWritesToActiveAndBufferedStreams() {
|
||||||
connection.local().maxActiveStreams(1);
|
connection.local().maxActiveStreams(1);
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue