From 13fe13bf9f672496d997b327ba33283e19c95846 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Thu, 11 Feb 2016 23:10:26 -0800 Subject: [PATCH] okhttp: Enable transport test OkHttpClientTransport has a fix for shutdown during start which prevented transportTerminated from being called. It also no longer fails pending streams during shutdown. Lifecycle management in general was revamped to be hopefully simpler and more precise. In the process GOAWAY handling (both sending and receiving) was improved. With some of the changes, the log spam generated was immense and unhelpful (since many exceptions are part of normal operation on shutdown). This change reduces the amount of log spam to nothing. --- okhttp/build.gradle | 3 +- .../java/io/grpc/okhttp/AsyncFrameWriter.java | 2 - .../io/grpc/okhttp/OkHttpClientTransport.java | 182 +++++++++--------- .../io/grpc/internal/AccessProtectedHack.java | 42 ++++ .../okhttp/OkHttpChannelProviderTest.java | 10 +- .../okhttp/OkHttpClientTransportTest.java | 27 ++- .../io/grpc/okhttp/OkHttpTransportTest.java | 84 ++++++++ 7 files changed, 243 insertions(+), 107 deletions(-) create mode 100644 okhttp/src/test/java/io/grpc/internal/AccessProtectedHack.java create mode 100644 okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java diff --git a/okhttp/build.gradle b/okhttp/build.gradle index 87351b5abb..b0177c2a1d 100644 --- a/okhttp/build.gradle +++ b/okhttp/build.gradle @@ -10,7 +10,8 @@ dependencies { // Tests depend on base class defined by core module. testCompile project(':grpc-core').sourceSets.test.output, - project(":grpc-testing") + project(':grpc-testing'), + project(':grpc-netty') } project.sourceSets { diff --git a/okhttp/src/main/java/io/grpc/okhttp/AsyncFrameWriter.java b/okhttp/src/main/java/io/grpc/okhttp/AsyncFrameWriter.java index 57eb772017..75db1dfdbf 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/AsyncFrameWriter.java +++ b/okhttp/src/main/java/io/grpc/okhttp/AsyncFrameWriter.java @@ -238,10 +238,8 @@ class AsyncFrameWriter implements FrameWriter { doRun(); } catch (RuntimeException e) { transport.onException(e); - throw e; } catch (Exception e) { transport.onException(e); - throw new RuntimeException(e); } } diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 0e661a36ad..2b1c9b8df0 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -63,6 +63,8 @@ import okio.BufferedSink; import okio.BufferedSource; import okio.ByteString; import okio.Okio; +import okio.Source; +import okio.Timeout; import java.io.IOException; import java.net.InetSocketAddress; @@ -141,17 +143,15 @@ class OkHttpClientTransport implements ManagedClientTransport { private final int maxMessageSize; private int connectionUnacknowledgedBytesRead; private ClientFrameHandler clientFrameHandler; - // Indicates the transport is in go-away state: no new streams will be processed, - // but existing streams may continue. - @GuardedBy("lock") - private boolean goAway; - // Used to indicate the special phase while we are going to enter go-away state but before - // goAway is turned to true, see the comment at where this is set about why it is needed. - @GuardedBy("lock") - private boolean startedGoAway; + /** + * Indicates the transport is in go-away state: no new streams will be processed, but existing + * streams may continue. + */ @GuardedBy("lock") private Status goAwayStatus; @GuardedBy("lock") + private boolean goAwaySent; + @GuardedBy("lock") private Http2Ping ping; @GuardedBy("lock") private boolean stopped; @@ -253,7 +253,7 @@ class OkHttpClientTransport implements ManagedClientTransport { @GuardedBy("lock") void streamReadyToStart(OkHttpClientStream clientStream) { synchronized (lock) { - if (goAway) { + if (goAwayStatus != null) { clientStream.transportReportStatus(goAwayStatus, true, new Metadata()); } else if (streams.size() >= maxConcurrentStreams) { pendingStreams.add(clientStream); @@ -278,7 +278,8 @@ class OkHttpClientTransport implements ManagedClientTransport { // Make sure nextStreamId greater than all used id, so that mayHaveCreatedStream() performs // correctly. nextStreamId = Integer.MAX_VALUE; - startGoAway(Integer.MAX_VALUE, Status.INTERNAL.withDescription("Stream ids exhausted")); + startGoAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR, + Status.UNAVAILABLE.withDescription("Stream ids exhausted")); } else { nextStreamId += 2; } @@ -287,16 +288,13 @@ class OkHttpClientTransport implements ManagedClientTransport { /** * Starts pending streams, returns true if at least one pending stream is started. */ + @GuardedBy("lock") private boolean startPendingStreams() { boolean hasStreamStarted = false; - synchronized (lock) { - // No need to check goAway since the pendingStreams will be cleared when goAway - // becomes true. - while (!pendingStreams.isEmpty() && streams.size() < maxConcurrentStreams) { - OkHttpClientStream stream = pendingStreams.poll(); - startStream(stream); - hasStreamStarted = true; - } + while (!pendingStreams.isEmpty() && streams.size() < maxConcurrentStreams) { + OkHttpClientStream stream = pendingStreams.poll(); + startStream(stream); + hasStreamStarted = true; } return hasStreamStarted; } @@ -329,14 +327,29 @@ class OkHttpClientTransport implements ManagedClientTransport { executor.execute(clientFrameHandler); synchronized (lock) { maxConcurrentStreams = Integer.MAX_VALUE; + startPendingStreams(); } frameWriter.becomeConnected(testFrameWriter, socket); - startPendingStreams(); connectedFuture.set(null); return; } - BufferedSource source; + // Use closed source on failure so that the reader immediately shuts down. + BufferedSource source = Okio.buffer(new Source() { + @Override + public long read(Buffer sink, long byteCount) { + return -1; + } + + @Override + public Timeout timeout() { + return Timeout.NONE; + } + + @Override + public void close() {} + }); + Variant variant = new Http2(); BufferedSink sink; Socket sock; try { @@ -348,32 +361,21 @@ class OkHttpClientTransport implements ManagedClientTransport { sock.setTcpNoDelay(true); source = Okio.buffer(Okio.source(sock)); sink = Okio.buffer(Okio.sink(sock)); - } catch (RuntimeException e) { - onException(e); - throw e; } catch (Exception e) { onException(e); - - // (and probably do all of this work asynchronously instead of in calling thread) - throw new RuntimeException(e); + return; + } finally { + clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true)); + executor.execute(clientFrameHandler); } FrameWriter rawFrameWriter; synchronized (lock) { - if (stopped) { - // In case user called shutdown() during the connecting. - try { - sock.close(); - } catch (IOException e) { - log.log(Level.WARNING, "Failed closing socket", e); - } - return; - } socket = sock; maxConcurrentStreams = Integer.MAX_VALUE; + startPendingStreams(); } - Variant variant = new Http2(); rawFrameWriter = variant.newWriter(sink, true); frameWriter.becomeConnected(rawFrameWriter, socket); @@ -383,17 +385,10 @@ class OkHttpClientTransport implements ManagedClientTransport { rawFrameWriter.connectionPreface(); Settings settings = new Settings(); rawFrameWriter.settings(settings); - } catch (RuntimeException e) { - onException(e); - throw e; } catch (Exception e) { onException(e); - throw new RuntimeException(e); + return; } - - clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true)); - executor.execute(clientFrameHandler); - startPendingStreams(); } }); } @@ -444,16 +439,14 @@ class OkHttpClientTransport implements ManagedClientTransport { @Override public void shutdown() { synchronized (lock) { - if (goAway) { + if (goAwayStatus != null) { return; } + + goAwayStatus = Status.UNAVAILABLE.withDescription("Transport stopped"); + listener.transportShutdown(goAwayStatus); + stopIfNecessary(); } - - // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated streams. - // The GOAWAY is part of graceful shutdown. - frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]); - - startGoAway(Integer.MAX_VALUE, Status.UNAVAILABLE.withDescription("Transport stopped")); } /** @@ -480,40 +473,34 @@ class OkHttpClientTransport implements ManagedClientTransport { /** * Finish all active streams due to an IOException, then close the transport. */ - void onException(Throwable cause) { - log.log(Level.WARNING, "Transport failed", cause); - Status status = Status.UNAVAILABLE.withCause(cause); - if (cause != null) { + void onException(Throwable failureCause) { + Status status = Status.UNAVAILABLE.withCause(failureCause); + if (failureCause != null) { status = status.augmentDescription("No provided cause"); } - startGoAway(0, status); + startGoAway(0, ErrorCode.INTERNAL_ERROR, status); } /** * Send GOAWAY to the server, then finish all active streams and close the transport. */ private void onError(ErrorCode errorCode, String moreDetail) { - frameWriter.goAway(0, errorCode, new byte[0]); - startGoAway(0, toGrpcStatus(errorCode).augmentDescription(moreDetail)); + startGoAway(0, errorCode, toGrpcStatus(errorCode).augmentDescription(moreDetail)); } - private void startGoAway(int lastKnownStreamId, Status status) { + private void startGoAway(int lastKnownStreamId, ErrorCode errorCode, Status status) { synchronized (lock) { - if (startedGoAway) { - // Another go-away is in progress, ignore this one. - return; + if (goAwayStatus == null) { + goAwayStatus = status; + listener.transportShutdown(status); + } + if (errorCode != null && !goAwaySent) { + // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated + // streams. The GOAWAY is part of graceful shutdown. + goAwaySent = true; + frameWriter.goAway(0, errorCode, new byte[0]); } - // We use startedGoAway here instead of goAway, because once the goAway becomes true, other - // thread in stopIfNecessary() may stop the transport and cause the - // listener.transportTerminated() be called before listener.transportShutdown(). - startedGoAway = true; - } - listener.transportShutdown(status); - - synchronized (lock) { - goAway = true; - goAwayStatus = status; Iterator> it = streams.entrySet().iterator(); while (it.hasNext()) { Map.Entry entry = it.next(); @@ -527,9 +514,9 @@ class OkHttpClientTransport implements ManagedClientTransport { stream.transportReportStatus(status, true, new Metadata()); } pendingStreams.clear(); - } - stopIfNecessary(); + stopIfNecessary(); + } } /** @@ -567,22 +554,31 @@ class OkHttpClientTransport implements ManagedClientTransport { /** * When the transport is in goAway state, we should stop it once all active streams finish. */ + @GuardedBy("lock") void stopIfNecessary() { - synchronized (lock) { - if (goAway && streams.size() == 0) { - if (!stopped) { - stopped = true; - // We will close the underlying socket in the writing thread to break out the reader - // thread, which will close the frameReader and notify the listener. - frameWriter.close(); - - if (ping != null) { - ping.failed(getPingFailure()); - ping = null; - } - } - } + if (!(goAwayStatus != null && streams.isEmpty() && pendingStreams.isEmpty())) { + return; } + if (stopped) { + return; + } + stopped = true; + + if (ping != null) { + ping.failed(getPingFailure()); + ping = null; + } + + if (!goAwaySent) { + // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated + // streams. The GOAWAY is part of graceful shutdown. + goAwaySent = true; + frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]); + } + + // We will close the underlying socket in the writing thread to break out the reader + // thread, which will close the frameReader and notify the listener. + frameWriter.close(); } private Throwable getPingFailure() { @@ -640,14 +636,11 @@ class OkHttpClientTransport implements ManagedClientTransport { // frameReader.nextFrame() returns false when the underlying read encounters an IOException, // it may be triggered by the socket closing, in such case, the startGoAway() will do // nothing, otherwise, we finish all streams since it's a real IO issue. - // We don't call onException() here since we don't want to log the warning in case this is - // triggered by socket closing. - startGoAway(0, + startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE.withDescription("End of stream or IOException")); } catch (Exception t) { // TODO(madongfly): Send the exception message to the server. - frameWriter.goAway(0, ErrorCode.PROTOCOL_ERROR, new byte[0]); - onException(t); + startGoAway(0, ErrorCode.PROTOCOL_ERROR, Status.UNAVAILABLE.withCause(t)); } finally { try { frameReader.close(); @@ -746,6 +739,7 @@ class OkHttpClientTransport implements ManagedClientTransport { listener.transportReady(); firstSettings = false; } + startPendingStreams(); } frameWriter.ackSettings(settings); @@ -791,7 +785,7 @@ class OkHttpClientTransport implements ManagedClientTransport { // If a debug message was provided, use it. status = status.augmentDescription(debugData.utf8()); } - startGoAway(lastGoodStreamId, status); + startGoAway(lastGoodStreamId, null, status); } @Override diff --git a/okhttp/src/test/java/io/grpc/internal/AccessProtectedHack.java b/okhttp/src/test/java/io/grpc/internal/AccessProtectedHack.java new file mode 100644 index 0000000000..bd3a05ffc2 --- /dev/null +++ b/okhttp/src/test/java/io/grpc/internal/AccessProtectedHack.java @@ -0,0 +1,42 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.internal; + +/** A hack to access protected methods from io.grpc.internal. */ +public final class AccessProtectedHack { + public static InternalServer serverBuilderBuildTransportServer( + AbstractServerImplBuilder builder) { + return builder.buildTransportServer(); + } + + private AccessProtectedHack() {} +} diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpChannelProviderTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpChannelProviderTest.java index 1f14516d7c..95b16cedc5 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpChannelProviderTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpChannelProviderTest.java @@ -33,6 +33,7 @@ package io.grpc.okhttp; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import io.grpc.ManagedChannelProvider; @@ -40,6 +41,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import java.util.ServiceLoader; + /** Unit tests for {@link OkHttpChannelProvider}. */ @RunWith(JUnit4.class) public class OkHttpChannelProviderTest { @@ -47,7 +50,12 @@ public class OkHttpChannelProviderTest { @Test public void provided() { - assertSame(OkHttpChannelProvider.class, ManagedChannelProvider.provider().getClass()); + for (ManagedChannelProvider current : ServiceLoader.load(ManagedChannelProvider.class)) { + if (current instanceof OkHttpChannelProvider) { + return; + } + } + fail("ServiceLoader unable to load OkHttpChannelProvider"); } @Test diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index 267aa405fb..ed429e676d 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -664,7 +664,6 @@ public class OkHttpClientTransportTest { stream2.start(listener2); assertEquals(2, activeStreamCount()); clientTransport.shutdown(); - verify(frameWriter, timeout(TIME_OUT_MS)).goAway(eq(0), eq(ErrorCode.NO_ERROR), (byte[]) any()); assertEquals(2, activeStreamCount()); verify(transportListener).transportShutdown(isA(Status.class)); @@ -676,6 +675,7 @@ public class OkHttpClientTransportTest { assertEquals(0, activeStreamCount()); assertEquals(Status.CANCELLED.getCode(), listener1.status.getCode()); assertEquals(Status.CANCELLED.getCode(), listener2.status.getCode()); + verify(frameWriter, timeout(TIME_OUT_MS)).goAway(eq(0), eq(ErrorCode.NO_ERROR), (byte[]) any()); verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); shutdownAndVerify(); } @@ -857,7 +857,7 @@ public class OkHttpClientTransportTest { } @Test - public void pendingStreamFailedByShutdown() throws Exception { + public void pendingStreamSucceedAfterShutdown() throws Exception { initTransport(); setMaxConcurrentStreams(0); final MockStreamListener listener = new MockStreamListener(); @@ -867,10 +867,11 @@ public class OkHttpClientTransportTest { waitForStreamPending(1); clientTransport.shutdown(); - - listener.waitUntilStreamClosed(); - assertEquals(Status.UNAVAILABLE.getCode(), listener.status.getCode()); - assertEquals(0, clientTransport.getPendingStreamSize()); + setMaxConcurrentStreams(1); + verify(frameWriter, timeout(TIME_OUT_MS)) + .synStream(anyBoolean(), anyBoolean(), eq(3), anyInt(), anyListHeader()); + assertEquals(1, activeStreamCount()); + stream.sendCancel(Status.CANCELLED); shutdownAndVerify(); } @@ -901,7 +902,7 @@ public class OkHttpClientTransportTest { stream1.cancel(Status.CANCELLED); listener1.waitUntilStreamClosed(); listener3.waitUntilStreamClosed(); - assertEquals(Status.INTERNAL.getCode(), listener3.status.getCode()); + assertEquals(Status.UNAVAILABLE.getCode(), listener3.status.getCode()); assertEquals(0, clientTransport.getPendingStreamSize()); assertEquals(1, activeStreamCount()); stream2 = getStream(startId + 2); @@ -1294,10 +1295,14 @@ public class OkHttpClientTransportTest { clientTransport.shutdown(); allowTransportConnected(); - // The new stream should be failed, as well as the pending stream. + // The new stream should be failed, but not the pending stream. assertNewStreamFail(); + verify(frameWriter, timeout(TIME_OUT_MS)) + .synStream(anyBoolean(), anyBoolean(), eq(3), anyInt(), anyListHeader()); + assertEquals(1, activeStreamCount()); + stream.cancel(Status.CANCELLED); listener.waitUntilStreamClosed(); - assertEquals(Status.UNAVAILABLE.getCode(), listener.status.getCode()); + assertEquals(Status.CANCELLED.getCode(), listener.status.getCode()); shutdownAndVerify(); } @@ -1414,6 +1419,10 @@ public class OkHttpClientTransportTest { CONTENT_TYPE_HEADER); } + private static List
anyListHeader() { + return any(); + } + private static class MockFrameReader implements FrameReader { CountDownLatch closed = new CountDownLatch(1); boolean throwExceptionForNextFrame; diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java new file mode 100644 index 0000000000..65d482ab1d --- /dev/null +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java @@ -0,0 +1,84 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.okhttp; + +import io.grpc.internal.AccessProtectedHack; +import io.grpc.internal.ClientTransportFactory; +import io.grpc.internal.InternalServer; +import io.grpc.internal.ManagedClientTransport; +import io.grpc.internal.testing.AbstractTransportTest; +import io.grpc.netty.NettyServerBuilder; +import io.grpc.testing.TestUtils; + +import org.junit.After; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.net.InetSocketAddress; + +/** Unit tests for OkHttp transport. */ +@RunWith(JUnit4.class) +public class OkHttpTransportTest extends AbstractTransportTest { + private static final int SERVER_PORT = TestUtils.pickUnusedPort(); + private ClientTransportFactory clientFactory = OkHttpChannelBuilder + // Although specified here, address is ignored because we never call build. + .forAddress("127.0.0.1", SERVER_PORT) + .negotiationType(NegotiationType.PLAINTEXT) + .buildTransportFactory(); + + @After + public void releaseClientFactory() { + clientFactory.close(); + } + + @Override + protected InternalServer newServer() { + return AccessProtectedHack.serverBuilderBuildTransportServer( + NettyServerBuilder + .forPort(SERVER_PORT) + .flowControlWindow(65 * 1024)); + } + + @Override + protected ManagedClientTransport newClientTransport() { + return clientFactory.newClientTransport( + new InetSocketAddress("127.0.0.1", SERVER_PORT), "127.0.0.1:" + SERVER_PORT); + } + + // TODO(ejona): Flaky/Broken + @Test + @Ignore + @Override + public void flowControlPushBack() {} +}