mirror of https://github.com/grpc/grpc-java.git
okHttp: Set max_concurrent_stream to 0 before the connection is connected.
So that the written messages will be queued inside the pending stream instead of the serializingExecutor.
This commit is contained in:
parent
522580dd0e
commit
b1e2aaebc0
|
|
@ -154,7 +154,7 @@ class OkHttpClientTransport implements ClientTransport {
|
||||||
private SSLSocketFactory sslSocketFactory;
|
private SSLSocketFactory sslSocketFactory;
|
||||||
private Socket socket;
|
private Socket socket;
|
||||||
@GuardedBy("lock")
|
@GuardedBy("lock")
|
||||||
private int maxConcurrentStreams = Integer.MAX_VALUE;
|
private int maxConcurrentStreams = 0;
|
||||||
@GuardedBy("lock")
|
@GuardedBy("lock")
|
||||||
private LinkedList<OkHttpClientStream> pendingStreams = new LinkedList<OkHttpClientStream>();
|
private LinkedList<OkHttpClientStream> pendingStreams = new LinkedList<OkHttpClientStream>();
|
||||||
private final ConnectionSpec connectionSpec;
|
private final ConnectionSpec connectionSpec;
|
||||||
|
|
@ -322,9 +322,14 @@ class OkHttpClientTransport implements ClientTransport {
|
||||||
clientFrameHandler = new ClientFrameHandler(testFrameReader);
|
clientFrameHandler = new ClientFrameHandler(testFrameReader);
|
||||||
executor.execute(clientFrameHandler);
|
executor.execute(clientFrameHandler);
|
||||||
connectedCallback.run();
|
connectedCallback.run();
|
||||||
|
synchronized (lock) {
|
||||||
|
maxConcurrentStreams = Integer.MAX_VALUE;
|
||||||
|
}
|
||||||
frameWriter.becomeConnected(testFrameWriter, socket);
|
frameWriter.becomeConnected(testFrameWriter, socket);
|
||||||
|
startPendingStreams();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
BufferedSource source;
|
BufferedSource source;
|
||||||
BufferedSink sink;
|
BufferedSink sink;
|
||||||
Socket sock;
|
Socket sock;
|
||||||
|
|
@ -355,6 +360,7 @@ class OkHttpClientTransport implements ClientTransport {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
socket = sock;
|
socket = sock;
|
||||||
|
maxConcurrentStreams = Integer.MAX_VALUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
Variant variant = new Http2();
|
Variant variant = new Http2();
|
||||||
|
|
@ -377,6 +383,7 @@ class OkHttpClientTransport implements ClientTransport {
|
||||||
OkHttpClientTransport.this.listener.transportReady();
|
OkHttpClientTransport.this.listener.transportReady();
|
||||||
}
|
}
|
||||||
executor.execute(clientFrameHandler);
|
executor.execute(clientFrameHandler);
|
||||||
|
startPendingStreams();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -144,8 +144,9 @@ public class OkHttpClientTransportTest {
|
||||||
frameReader = new MockFrameReader();
|
frameReader = new MockFrameReader();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initTransport() {
|
private void initTransport() throws Exception {
|
||||||
initTransport(3, new ConnectedCallback(false));
|
initTransport(3, new ConnectedCallback(false));
|
||||||
|
connectedCallback.waitUntilConnected();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initTransport(int startId, ConnectedCallback connectedCallback) {
|
private void initTransport(int startId, ConnectedCallback connectedCallback) {
|
||||||
|
|
@ -1091,17 +1092,7 @@ public class OkHttpClientTransportTest {
|
||||||
verifyNoMoreInteractions(frameWriter);
|
verifyNoMoreInteractions(frameWriter);
|
||||||
|
|
||||||
connectedCallback.allowConnected();
|
connectedCallback.allowConnected();
|
||||||
|
verifyNoMoreInteractions(frameWriter);
|
||||||
// There should be 4 pending operations
|
|
||||||
verify(frameWriter, timeout(TIME_OUT_MS)).synStream(
|
|
||||||
eq(false), eq(false), eq(3), eq(0), Matchers.<List<Header>>any());
|
|
||||||
verify(frameWriter, timeout(TIME_OUT_MS)).flush();
|
|
||||||
verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL));
|
|
||||||
|
|
||||||
// TODO(madongfly): Is this really what we want, we may just throw away the messages of
|
|
||||||
// a cancelled stream.
|
|
||||||
verify(frameWriter, timeout(TIME_OUT_MS))
|
|
||||||
.data(eq(false), eq(3), any(Buffer.class), eq(12 + HEADER_LENGTH));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -1114,17 +1105,10 @@ public class OkHttpClientTransportTest {
|
||||||
clientTransport.shutdown();
|
clientTransport.shutdown();
|
||||||
connectedCallback.allowConnected();
|
connectedCallback.allowConnected();
|
||||||
|
|
||||||
// The new stream should be failed, but the started stream should not be affected.
|
// The new stream should be failed, as well as the pending stream.
|
||||||
assertNewStreamFail();
|
assertNewStreamFail();
|
||||||
InputStream input = new ByteArrayInputStream(message.getBytes(UTF_8));
|
listener.waitUntilStreamClosed();
|
||||||
stream.writeMessage(input);
|
assertEquals(Status.UNAVAILABLE.getCode(), listener.status.getCode());
|
||||||
stream.flush();
|
|
||||||
ArgumentCaptor<Buffer> captor = ArgumentCaptor.forClass(Buffer.class);
|
|
||||||
verify(frameWriter, timeout(TIME_OUT_MS))
|
|
||||||
.data(eq(false), eq(3), captor.capture(), eq(12 + HEADER_LENGTH));
|
|
||||||
Buffer sentFrame = captor.getValue();
|
|
||||||
assertEquals(createMessageFrame(message), sentFrame);
|
|
||||||
stream.cancel(Status.CANCELLED);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private int activeStreamCount() {
|
private int activeStreamCount() {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue