diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 8c8ef855bb..f3f8d53666 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -154,7 +154,7 @@ class OkHttpClientTransport implements ClientTransport { private SSLSocketFactory sslSocketFactory; private Socket socket; @GuardedBy("lock") - private int maxConcurrentStreams = Integer.MAX_VALUE; + private int maxConcurrentStreams = 0; @GuardedBy("lock") private LinkedList pendingStreams = new LinkedList(); private final ConnectionSpec connectionSpec; @@ -322,9 +322,14 @@ class OkHttpClientTransport implements ClientTransport { clientFrameHandler = new ClientFrameHandler(testFrameReader); executor.execute(clientFrameHandler); connectedCallback.run(); + synchronized (lock) { + maxConcurrentStreams = Integer.MAX_VALUE; + } frameWriter.becomeConnected(testFrameWriter, socket); + startPendingStreams(); return; } + BufferedSource source; BufferedSink sink; Socket sock; @@ -355,6 +360,7 @@ class OkHttpClientTransport implements ClientTransport { return; } socket = sock; + maxConcurrentStreams = Integer.MAX_VALUE; } Variant variant = new Http2(); @@ -377,6 +383,7 @@ class OkHttpClientTransport implements ClientTransport { OkHttpClientTransport.this.listener.transportReady(); } executor.execute(clientFrameHandler); + startPendingStreams(); } }); } diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index 2a86c28ed1..aa24506a61 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -144,8 +144,9 @@ public class OkHttpClientTransportTest { frameReader = new MockFrameReader(); } - private void initTransport() { + private void initTransport() throws Exception { initTransport(3, new ConnectedCallback(false)); + connectedCallback.waitUntilConnected(); } private void initTransport(int startId, ConnectedCallback connectedCallback) { @@ -1091,17 +1092,7 @@ public class OkHttpClientTransportTest { verifyNoMoreInteractions(frameWriter); connectedCallback.allowConnected(); - - // There should be 4 pending operations - verify(frameWriter, timeout(TIME_OUT_MS)).synStream( - eq(false), eq(false), eq(3), eq(0), Matchers.>any()); - verify(frameWriter, timeout(TIME_OUT_MS)).flush(); - verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL)); - - // TODO(madongfly): Is this really what we want, we may just throw away the messages of - // a cancelled stream. - verify(frameWriter, timeout(TIME_OUT_MS)) - .data(eq(false), eq(3), any(Buffer.class), eq(12 + HEADER_LENGTH)); + verifyNoMoreInteractions(frameWriter); } @Test @@ -1114,17 +1105,10 @@ public class OkHttpClientTransportTest { clientTransport.shutdown(); connectedCallback.allowConnected(); - // The new stream should be failed, but the started stream should not be affected. + // The new stream should be failed, as well as the pending stream. assertNewStreamFail(); - InputStream input = new ByteArrayInputStream(message.getBytes(UTF_8)); - stream.writeMessage(input); - stream.flush(); - ArgumentCaptor captor = ArgumentCaptor.forClass(Buffer.class); - verify(frameWriter, timeout(TIME_OUT_MS)) - .data(eq(false), eq(3), captor.capture(), eq(12 + HEADER_LENGTH)); - Buffer sentFrame = captor.getValue(); - assertEquals(createMessageFrame(message), sentFrame); - stream.cancel(Status.CANCELLED); + listener.waitUntilStreamClosed(); + assertEquals(Status.UNAVAILABLE.getCode(), listener.status.getCode()); } private int activeStreamCount() {