diff --git a/okhttp/build.gradle b/okhttp/build.gradle index 87351b5abb..b0177c2a1d 100644 --- a/okhttp/build.gradle +++ b/okhttp/build.gradle @@ -10,7 +10,8 @@ dependencies { // Tests depend on base class defined by core module. testCompile project(':grpc-core').sourceSets.test.output, - project(":grpc-testing") + project(':grpc-testing'), + project(':grpc-netty') } project.sourceSets { diff --git a/okhttp/src/main/java/io/grpc/okhttp/AsyncFrameWriter.java b/okhttp/src/main/java/io/grpc/okhttp/AsyncFrameWriter.java index 57eb772017..75db1dfdbf 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/AsyncFrameWriter.java +++ b/okhttp/src/main/java/io/grpc/okhttp/AsyncFrameWriter.java @@ -238,10 +238,8 @@ class AsyncFrameWriter implements FrameWriter { doRun(); } catch (RuntimeException e) { transport.onException(e); - throw e; } catch (Exception e) { transport.onException(e); - throw new RuntimeException(e); } } diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 0e661a36ad..2b1c9b8df0 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -63,6 +63,8 @@ import okio.BufferedSink; import okio.BufferedSource; import okio.ByteString; import okio.Okio; +import okio.Source; +import okio.Timeout; import java.io.IOException; import java.net.InetSocketAddress; @@ -141,17 +143,15 @@ class OkHttpClientTransport implements ManagedClientTransport { private final int maxMessageSize; private int connectionUnacknowledgedBytesRead; private ClientFrameHandler clientFrameHandler; - // Indicates the transport is in go-away state: no new streams will be processed, - // but existing streams may continue. - @GuardedBy("lock") - private boolean goAway; - // Used to indicate the special phase while we are going to enter go-away state but before - // goAway is turned to true, see the comment at where this is set about why it is needed. - @GuardedBy("lock") - private boolean startedGoAway; + /** + * Indicates the transport is in go-away state: no new streams will be processed, but existing + * streams may continue. + */ @GuardedBy("lock") private Status goAwayStatus; @GuardedBy("lock") + private boolean goAwaySent; + @GuardedBy("lock") private Http2Ping ping; @GuardedBy("lock") private boolean stopped; @@ -253,7 +253,7 @@ class OkHttpClientTransport implements ManagedClientTransport { @GuardedBy("lock") void streamReadyToStart(OkHttpClientStream clientStream) { synchronized (lock) { - if (goAway) { + if (goAwayStatus != null) { clientStream.transportReportStatus(goAwayStatus, true, new Metadata()); } else if (streams.size() >= maxConcurrentStreams) { pendingStreams.add(clientStream); @@ -278,7 +278,8 @@ class OkHttpClientTransport implements ManagedClientTransport { // Make sure nextStreamId greater than all used id, so that mayHaveCreatedStream() performs // correctly. nextStreamId = Integer.MAX_VALUE; - startGoAway(Integer.MAX_VALUE, Status.INTERNAL.withDescription("Stream ids exhausted")); + startGoAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR, + Status.UNAVAILABLE.withDescription("Stream ids exhausted")); } else { nextStreamId += 2; } @@ -287,16 +288,13 @@ class OkHttpClientTransport implements ManagedClientTransport { /** * Starts pending streams, returns true if at least one pending stream is started. */ + @GuardedBy("lock") private boolean startPendingStreams() { boolean hasStreamStarted = false; - synchronized (lock) { - // No need to check goAway since the pendingStreams will be cleared when goAway - // becomes true. - while (!pendingStreams.isEmpty() && streams.size() < maxConcurrentStreams) { - OkHttpClientStream stream = pendingStreams.poll(); - startStream(stream); - hasStreamStarted = true; - } + while (!pendingStreams.isEmpty() && streams.size() < maxConcurrentStreams) { + OkHttpClientStream stream = pendingStreams.poll(); + startStream(stream); + hasStreamStarted = true; } return hasStreamStarted; } @@ -329,14 +327,29 @@ class OkHttpClientTransport implements ManagedClientTransport { executor.execute(clientFrameHandler); synchronized (lock) { maxConcurrentStreams = Integer.MAX_VALUE; + startPendingStreams(); } frameWriter.becomeConnected(testFrameWriter, socket); - startPendingStreams(); connectedFuture.set(null); return; } - BufferedSource source; + // Use closed source on failure so that the reader immediately shuts down. + BufferedSource source = Okio.buffer(new Source() { + @Override + public long read(Buffer sink, long byteCount) { + return -1; + } + + @Override + public Timeout timeout() { + return Timeout.NONE; + } + + @Override + public void close() {} + }); + Variant variant = new Http2(); BufferedSink sink; Socket sock; try { @@ -348,32 +361,21 @@ class OkHttpClientTransport implements ManagedClientTransport { sock.setTcpNoDelay(true); source = Okio.buffer(Okio.source(sock)); sink = Okio.buffer(Okio.sink(sock)); - } catch (RuntimeException e) { - onException(e); - throw e; } catch (Exception e) { onException(e); - - // (and probably do all of this work asynchronously instead of in calling thread) - throw new RuntimeException(e); + return; + } finally { + clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true)); + executor.execute(clientFrameHandler); } FrameWriter rawFrameWriter; synchronized (lock) { - if (stopped) { - // In case user called shutdown() during the connecting. - try { - sock.close(); - } catch (IOException e) { - log.log(Level.WARNING, "Failed closing socket", e); - } - return; - } socket = sock; maxConcurrentStreams = Integer.MAX_VALUE; + startPendingStreams(); } - Variant variant = new Http2(); rawFrameWriter = variant.newWriter(sink, true); frameWriter.becomeConnected(rawFrameWriter, socket); @@ -383,17 +385,10 @@ class OkHttpClientTransport implements ManagedClientTransport { rawFrameWriter.connectionPreface(); Settings settings = new Settings(); rawFrameWriter.settings(settings); - } catch (RuntimeException e) { - onException(e); - throw e; } catch (Exception e) { onException(e); - throw new RuntimeException(e); + return; } - - clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true)); - executor.execute(clientFrameHandler); - startPendingStreams(); } }); } @@ -444,16 +439,14 @@ class OkHttpClientTransport implements ManagedClientTransport { @Override public void shutdown() { synchronized (lock) { - if (goAway) { + if (goAwayStatus != null) { return; } + + goAwayStatus = Status.UNAVAILABLE.withDescription("Transport stopped"); + listener.transportShutdown(goAwayStatus); + stopIfNecessary(); } - - // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated streams. - // The GOAWAY is part of graceful shutdown. - frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]); - - startGoAway(Integer.MAX_VALUE, Status.UNAVAILABLE.withDescription("Transport stopped")); } /** @@ -480,40 +473,34 @@ class OkHttpClientTransport implements ManagedClientTransport { /** * Finish all active streams due to an IOException, then close the transport. */ - void onException(Throwable cause) { - log.log(Level.WARNING, "Transport failed", cause); - Status status = Status.UNAVAILABLE.withCause(cause); - if (cause != null) { + void onException(Throwable failureCause) { + Status status = Status.UNAVAILABLE.withCause(failureCause); + if (failureCause != null) { status = status.augmentDescription("No provided cause"); } - startGoAway(0, status); + startGoAway(0, ErrorCode.INTERNAL_ERROR, status); } /** * Send GOAWAY to the server, then finish all active streams and close the transport. */ private void onError(ErrorCode errorCode, String moreDetail) { - frameWriter.goAway(0, errorCode, new byte[0]); - startGoAway(0, toGrpcStatus(errorCode).augmentDescription(moreDetail)); + startGoAway(0, errorCode, toGrpcStatus(errorCode).augmentDescription(moreDetail)); } - private void startGoAway(int lastKnownStreamId, Status status) { + private void startGoAway(int lastKnownStreamId, ErrorCode errorCode, Status status) { synchronized (lock) { - if (startedGoAway) { - // Another go-away is in progress, ignore this one. - return; + if (goAwayStatus == null) { + goAwayStatus = status; + listener.transportShutdown(status); + } + if (errorCode != null && !goAwaySent) { + // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated + // streams. The GOAWAY is part of graceful shutdown. + goAwaySent = true; + frameWriter.goAway(0, errorCode, new byte[0]); } - // We use startedGoAway here instead of goAway, because once the goAway becomes true, other - // thread in stopIfNecessary() may stop the transport and cause the - // listener.transportTerminated() be called before listener.transportShutdown(). - startedGoAway = true; - } - listener.transportShutdown(status); - - synchronized (lock) { - goAway = true; - goAwayStatus = status; Iterator> it = streams.entrySet().iterator(); while (it.hasNext()) { Map.Entry entry = it.next(); @@ -527,9 +514,9 @@ class OkHttpClientTransport implements ManagedClientTransport { stream.transportReportStatus(status, true, new Metadata()); } pendingStreams.clear(); - } - stopIfNecessary(); + stopIfNecessary(); + } } /** @@ -567,22 +554,31 @@ class OkHttpClientTransport implements ManagedClientTransport { /** * When the transport is in goAway state, we should stop it once all active streams finish. */ + @GuardedBy("lock") void stopIfNecessary() { - synchronized (lock) { - if (goAway && streams.size() == 0) { - if (!stopped) { - stopped = true; - // We will close the underlying socket in the writing thread to break out the reader - // thread, which will close the frameReader and notify the listener. - frameWriter.close(); - - if (ping != null) { - ping.failed(getPingFailure()); - ping = null; - } - } - } + if (!(goAwayStatus != null && streams.isEmpty() && pendingStreams.isEmpty())) { + return; } + if (stopped) { + return; + } + stopped = true; + + if (ping != null) { + ping.failed(getPingFailure()); + ping = null; + } + + if (!goAwaySent) { + // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated + // streams. The GOAWAY is part of graceful shutdown. + goAwaySent = true; + frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]); + } + + // We will close the underlying socket in the writing thread to break out the reader + // thread, which will close the frameReader and notify the listener. + frameWriter.close(); } private Throwable getPingFailure() { @@ -640,14 +636,11 @@ class OkHttpClientTransport implements ManagedClientTransport { // frameReader.nextFrame() returns false when the underlying read encounters an IOException, // it may be triggered by the socket closing, in such case, the startGoAway() will do // nothing, otherwise, we finish all streams since it's a real IO issue. - // We don't call onException() here since we don't want to log the warning in case this is - // triggered by socket closing. - startGoAway(0, + startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE.withDescription("End of stream or IOException")); } catch (Exception t) { // TODO(madongfly): Send the exception message to the server. - frameWriter.goAway(0, ErrorCode.PROTOCOL_ERROR, new byte[0]); - onException(t); + startGoAway(0, ErrorCode.PROTOCOL_ERROR, Status.UNAVAILABLE.withCause(t)); } finally { try { frameReader.close(); @@ -746,6 +739,7 @@ class OkHttpClientTransport implements ManagedClientTransport { listener.transportReady(); firstSettings = false; } + startPendingStreams(); } frameWriter.ackSettings(settings); @@ -791,7 +785,7 @@ class OkHttpClientTransport implements ManagedClientTransport { // If a debug message was provided, use it. status = status.augmentDescription(debugData.utf8()); } - startGoAway(lastGoodStreamId, status); + startGoAway(lastGoodStreamId, null, status); } @Override diff --git a/okhttp/src/test/java/io/grpc/internal/AccessProtectedHack.java b/okhttp/src/test/java/io/grpc/internal/AccessProtectedHack.java new file mode 100644 index 0000000000..bd3a05ffc2 --- /dev/null +++ b/okhttp/src/test/java/io/grpc/internal/AccessProtectedHack.java @@ -0,0 +1,42 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.internal; + +/** A hack to access protected methods from io.grpc.internal. */ +public final class AccessProtectedHack { + public static InternalServer serverBuilderBuildTransportServer( + AbstractServerImplBuilder builder) { + return builder.buildTransportServer(); + } + + private AccessProtectedHack() {} +} diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpChannelProviderTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpChannelProviderTest.java index 1f14516d7c..95b16cedc5 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpChannelProviderTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpChannelProviderTest.java @@ -33,6 +33,7 @@ package io.grpc.okhttp; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import io.grpc.ManagedChannelProvider; @@ -40,6 +41,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import java.util.ServiceLoader; + /** Unit tests for {@link OkHttpChannelProvider}. */ @RunWith(JUnit4.class) public class OkHttpChannelProviderTest { @@ -47,7 +50,12 @@ public class OkHttpChannelProviderTest { @Test public void provided() { - assertSame(OkHttpChannelProvider.class, ManagedChannelProvider.provider().getClass()); + for (ManagedChannelProvider current : ServiceLoader.load(ManagedChannelProvider.class)) { + if (current instanceof OkHttpChannelProvider) { + return; + } + } + fail("ServiceLoader unable to load OkHttpChannelProvider"); } @Test diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index 267aa405fb..ed429e676d 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -664,7 +664,6 @@ public class OkHttpClientTransportTest { stream2.start(listener2); assertEquals(2, activeStreamCount()); clientTransport.shutdown(); - verify(frameWriter, timeout(TIME_OUT_MS)).goAway(eq(0), eq(ErrorCode.NO_ERROR), (byte[]) any()); assertEquals(2, activeStreamCount()); verify(transportListener).transportShutdown(isA(Status.class)); @@ -676,6 +675,7 @@ public class OkHttpClientTransportTest { assertEquals(0, activeStreamCount()); assertEquals(Status.CANCELLED.getCode(), listener1.status.getCode()); assertEquals(Status.CANCELLED.getCode(), listener2.status.getCode()); + verify(frameWriter, timeout(TIME_OUT_MS)).goAway(eq(0), eq(ErrorCode.NO_ERROR), (byte[]) any()); verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); shutdownAndVerify(); } @@ -857,7 +857,7 @@ public class OkHttpClientTransportTest { } @Test - public void pendingStreamFailedByShutdown() throws Exception { + public void pendingStreamSucceedAfterShutdown() throws Exception { initTransport(); setMaxConcurrentStreams(0); final MockStreamListener listener = new MockStreamListener(); @@ -867,10 +867,11 @@ public class OkHttpClientTransportTest { waitForStreamPending(1); clientTransport.shutdown(); - - listener.waitUntilStreamClosed(); - assertEquals(Status.UNAVAILABLE.getCode(), listener.status.getCode()); - assertEquals(0, clientTransport.getPendingStreamSize()); + setMaxConcurrentStreams(1); + verify(frameWriter, timeout(TIME_OUT_MS)) + .synStream(anyBoolean(), anyBoolean(), eq(3), anyInt(), anyListHeader()); + assertEquals(1, activeStreamCount()); + stream.sendCancel(Status.CANCELLED); shutdownAndVerify(); } @@ -901,7 +902,7 @@ public class OkHttpClientTransportTest { stream1.cancel(Status.CANCELLED); listener1.waitUntilStreamClosed(); listener3.waitUntilStreamClosed(); - assertEquals(Status.INTERNAL.getCode(), listener3.status.getCode()); + assertEquals(Status.UNAVAILABLE.getCode(), listener3.status.getCode()); assertEquals(0, clientTransport.getPendingStreamSize()); assertEquals(1, activeStreamCount()); stream2 = getStream(startId + 2); @@ -1294,10 +1295,14 @@ public class OkHttpClientTransportTest { clientTransport.shutdown(); allowTransportConnected(); - // The new stream should be failed, as well as the pending stream. + // The new stream should be failed, but not the pending stream. assertNewStreamFail(); + verify(frameWriter, timeout(TIME_OUT_MS)) + .synStream(anyBoolean(), anyBoolean(), eq(3), anyInt(), anyListHeader()); + assertEquals(1, activeStreamCount()); + stream.cancel(Status.CANCELLED); listener.waitUntilStreamClosed(); - assertEquals(Status.UNAVAILABLE.getCode(), listener.status.getCode()); + assertEquals(Status.CANCELLED.getCode(), listener.status.getCode()); shutdownAndVerify(); } @@ -1414,6 +1419,10 @@ public class OkHttpClientTransportTest { CONTENT_TYPE_HEADER); } + private static List
anyListHeader() { + return any(); + } + private static class MockFrameReader implements FrameReader { CountDownLatch closed = new CountDownLatch(1); boolean throwExceptionForNextFrame; diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java new file mode 100644 index 0000000000..65d482ab1d --- /dev/null +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java @@ -0,0 +1,84 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.okhttp; + +import io.grpc.internal.AccessProtectedHack; +import io.grpc.internal.ClientTransportFactory; +import io.grpc.internal.InternalServer; +import io.grpc.internal.ManagedClientTransport; +import io.grpc.internal.testing.AbstractTransportTest; +import io.grpc.netty.NettyServerBuilder; +import io.grpc.testing.TestUtils; + +import org.junit.After; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.net.InetSocketAddress; + +/** Unit tests for OkHttp transport. */ +@RunWith(JUnit4.class) +public class OkHttpTransportTest extends AbstractTransportTest { + private static final int SERVER_PORT = TestUtils.pickUnusedPort(); + private ClientTransportFactory clientFactory = OkHttpChannelBuilder + // Although specified here, address is ignored because we never call build. + .forAddress("127.0.0.1", SERVER_PORT) + .negotiationType(NegotiationType.PLAINTEXT) + .buildTransportFactory(); + + @After + public void releaseClientFactory() { + clientFactory.close(); + } + + @Override + protected InternalServer newServer() { + return AccessProtectedHack.serverBuilderBuildTransportServer( + NettyServerBuilder + .forPort(SERVER_PORT) + .flowControlWindow(65 * 1024)); + } + + @Override + protected ManagedClientTransport newClientTransport() { + return clientFactory.newClientTransport( + new InetSocketAddress("127.0.0.1", SERVER_PORT), "127.0.0.1:" + SERVER_PORT); + } + + // TODO(ejona): Flaky/Broken + @Test + @Ignore + @Override + public void flowControlPushBack() {} +}