diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 2649b63c9e..3a1feae039 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -754,7 +754,7 @@ class OkHttpClientTransport implements ConnectionClientTransport { // nothing, otherwise, we finish all streams since it's a real IO issue. startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE.withDescription("End of stream or IOException")); - } catch (Exception t) { + } catch (Throwable t) { // TODO(madongfly): Send the exception message to the server. startGoAway(0, ErrorCode.PROTOCOL_ERROR, Status.UNAVAILABLE.withCause(t)); } finally { diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index 816c8cf2d0..2a78f68c01 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -123,6 +123,7 @@ import javax.annotation.Nullable; public class OkHttpClientTransportTest { private static final int TIME_OUT_MS = 2000; private static final String NETWORK_ISSUE_MESSAGE = "network issue"; + private static final String ERROR_MESSAGE = "simulated error"; // The gRPC header length, which includes 1 byte compression flag and 4 bytes message length. private static final int HEADER_LENGTH = 5; @@ -243,6 +244,7 @@ public class OkHttpClientTransportTest { frameReader.throwIoExceptionForNextFrame(); listener1.waitUntilStreamClosed(); listener2.waitUntilStreamClosed(); + assertEquals(0, activeStreamCount()); assertEquals(Status.UNAVAILABLE.getCode(), listener1.status.getCode()); assertEquals(NETWORK_ISSUE_MESSAGE, listener1.status.getCause().getMessage()); @@ -253,6 +255,31 @@ public class OkHttpClientTransportTest { shutdownAndVerify(); } + /** + * Test that even if an Error is thrown from the reading loop of the transport, + * it can still clean up and call transportShutdown() and transportTerminated() as expected + * by the channel. + */ + @Test + public void nextFrameThrowsError() throws Exception { + initTransport(); + MockStreamListener listener = new MockStreamListener(); + OkHttpClientStream stream = clientTransport.newStream(method, new Metadata()); + stream.start(listener); + stream.request(1); + assertEquals(1, activeStreamCount()); + assertContainStream(3); + frameReader.throwErrorForNextFrame(); + listener.waitUntilStreamClosed(); + + assertEquals(0, activeStreamCount()); + assertEquals(Status.UNAVAILABLE.getCode(), listener.status.getCode()); + assertEquals(ERROR_MESSAGE, listener.status.getCause().getMessage()); + verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(isA(Status.class)); + verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); + shutdownAndVerify(); + } + @Test public void nextFrameReturnFalse() throws Exception { initTransport(); @@ -1426,6 +1453,7 @@ public class OkHttpClientTransportTest { CountDownLatch closed = new CountDownLatch(1); boolean throwExceptionForNextFrame; boolean returnFalseInNextFrame; + boolean throwErrorForNextFrame; @Override public void close() throws IOException { @@ -1445,6 +1473,9 @@ public class OkHttpClientTransportTest { @Override public synchronized boolean nextFrame(Handler handler) throws IOException { + if (throwErrorForNextFrame) { + throw new Error(ERROR_MESSAGE); + } if (throwExceptionForNextFrame) { throw new IOException(NETWORK_ISSUE_MESSAGE); } @@ -1468,6 +1499,11 @@ public class OkHttpClientTransportTest { notifyAll(); } + synchronized void throwErrorForNextFrame() { + throwErrorForNextFrame = true; + notifyAll(); + } + synchronized void nextFrameAtEndOfStream() { returnFalseInNextFrame = true; notifyAll();