mirror of https://github.com/grpc/grpc-java.git
okhttp: Enable transport test
OkHttpClientTransport has a fix for shutdown during start which prevented transportTerminated from being called. It also no longer fails pending streams during shutdown. Lifecycle management in general was revamped to be hopefully simpler and more precise. In the process GOAWAY handling (both sending and receiving) was improved. With some of the changes, the log spam generated was immense and unhelpful (since many exceptions are part of normal operation on shutdown). This change reduces the amount of log spam to nothing.
This commit is contained in:
parent
188840b920
commit
13fe13bf9f
|
|
@ -10,7 +10,8 @@ dependencies {
|
|||
|
||||
// Tests depend on base class defined by core module.
|
||||
testCompile project(':grpc-core').sourceSets.test.output,
|
||||
project(":grpc-testing")
|
||||
project(':grpc-testing'),
|
||||
project(':grpc-netty')
|
||||
}
|
||||
|
||||
project.sourceSets {
|
||||
|
|
|
|||
|
|
@ -238,10 +238,8 @@ class AsyncFrameWriter implements FrameWriter {
|
|||
doRun();
|
||||
} catch (RuntimeException e) {
|
||||
transport.onException(e);
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
transport.onException(e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -63,6 +63,8 @@ import okio.BufferedSink;
|
|||
import okio.BufferedSource;
|
||||
import okio.ByteString;
|
||||
import okio.Okio;
|
||||
import okio.Source;
|
||||
import okio.Timeout;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
|
@ -141,17 +143,15 @@ class OkHttpClientTransport implements ManagedClientTransport {
|
|||
private final int maxMessageSize;
|
||||
private int connectionUnacknowledgedBytesRead;
|
||||
private ClientFrameHandler clientFrameHandler;
|
||||
// Indicates the transport is in go-away state: no new streams will be processed,
|
||||
// but existing streams may continue.
|
||||
@GuardedBy("lock")
|
||||
private boolean goAway;
|
||||
// Used to indicate the special phase while we are going to enter go-away state but before
|
||||
// goAway is turned to true, see the comment at where this is set about why it is needed.
|
||||
@GuardedBy("lock")
|
||||
private boolean startedGoAway;
|
||||
/**
|
||||
* Indicates the transport is in go-away state: no new streams will be processed, but existing
|
||||
* streams may continue.
|
||||
*/
|
||||
@GuardedBy("lock")
|
||||
private Status goAwayStatus;
|
||||
@GuardedBy("lock")
|
||||
private boolean goAwaySent;
|
||||
@GuardedBy("lock")
|
||||
private Http2Ping ping;
|
||||
@GuardedBy("lock")
|
||||
private boolean stopped;
|
||||
|
|
@ -253,7 +253,7 @@ class OkHttpClientTransport implements ManagedClientTransport {
|
|||
@GuardedBy("lock")
|
||||
void streamReadyToStart(OkHttpClientStream clientStream) {
|
||||
synchronized (lock) {
|
||||
if (goAway) {
|
||||
if (goAwayStatus != null) {
|
||||
clientStream.transportReportStatus(goAwayStatus, true, new Metadata());
|
||||
} else if (streams.size() >= maxConcurrentStreams) {
|
||||
pendingStreams.add(clientStream);
|
||||
|
|
@ -278,7 +278,8 @@ class OkHttpClientTransport implements ManagedClientTransport {
|
|||
// Make sure nextStreamId greater than all used id, so that mayHaveCreatedStream() performs
|
||||
// correctly.
|
||||
nextStreamId = Integer.MAX_VALUE;
|
||||
startGoAway(Integer.MAX_VALUE, Status.INTERNAL.withDescription("Stream ids exhausted"));
|
||||
startGoAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR,
|
||||
Status.UNAVAILABLE.withDescription("Stream ids exhausted"));
|
||||
} else {
|
||||
nextStreamId += 2;
|
||||
}
|
||||
|
|
@ -287,16 +288,13 @@ class OkHttpClientTransport implements ManagedClientTransport {
|
|||
/**
|
||||
* Starts pending streams, returns true if at least one pending stream is started.
|
||||
*/
|
||||
@GuardedBy("lock")
|
||||
private boolean startPendingStreams() {
|
||||
boolean hasStreamStarted = false;
|
||||
synchronized (lock) {
|
||||
// No need to check goAway since the pendingStreams will be cleared when goAway
|
||||
// becomes true.
|
||||
while (!pendingStreams.isEmpty() && streams.size() < maxConcurrentStreams) {
|
||||
OkHttpClientStream stream = pendingStreams.poll();
|
||||
startStream(stream);
|
||||
hasStreamStarted = true;
|
||||
}
|
||||
while (!pendingStreams.isEmpty() && streams.size() < maxConcurrentStreams) {
|
||||
OkHttpClientStream stream = pendingStreams.poll();
|
||||
startStream(stream);
|
||||
hasStreamStarted = true;
|
||||
}
|
||||
return hasStreamStarted;
|
||||
}
|
||||
|
|
@ -329,14 +327,29 @@ class OkHttpClientTransport implements ManagedClientTransport {
|
|||
executor.execute(clientFrameHandler);
|
||||
synchronized (lock) {
|
||||
maxConcurrentStreams = Integer.MAX_VALUE;
|
||||
startPendingStreams();
|
||||
}
|
||||
frameWriter.becomeConnected(testFrameWriter, socket);
|
||||
startPendingStreams();
|
||||
connectedFuture.set(null);
|
||||
return;
|
||||
}
|
||||
|
||||
BufferedSource source;
|
||||
// Use closed source on failure so that the reader immediately shuts down.
|
||||
BufferedSource source = Okio.buffer(new Source() {
|
||||
@Override
|
||||
public long read(Buffer sink, long byteCount) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Timeout timeout() {
|
||||
return Timeout.NONE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {}
|
||||
});
|
||||
Variant variant = new Http2();
|
||||
BufferedSink sink;
|
||||
Socket sock;
|
||||
try {
|
||||
|
|
@ -348,32 +361,21 @@ class OkHttpClientTransport implements ManagedClientTransport {
|
|||
sock.setTcpNoDelay(true);
|
||||
source = Okio.buffer(Okio.source(sock));
|
||||
sink = Okio.buffer(Okio.sink(sock));
|
||||
} catch (RuntimeException e) {
|
||||
onException(e);
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
onException(e);
|
||||
|
||||
// (and probably do all of this work asynchronously instead of in calling thread)
|
||||
throw new RuntimeException(e);
|
||||
return;
|
||||
} finally {
|
||||
clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true));
|
||||
executor.execute(clientFrameHandler);
|
||||
}
|
||||
|
||||
FrameWriter rawFrameWriter;
|
||||
synchronized (lock) {
|
||||
if (stopped) {
|
||||
// In case user called shutdown() during the connecting.
|
||||
try {
|
||||
sock.close();
|
||||
} catch (IOException e) {
|
||||
log.log(Level.WARNING, "Failed closing socket", e);
|
||||
}
|
||||
return;
|
||||
}
|
||||
socket = sock;
|
||||
maxConcurrentStreams = Integer.MAX_VALUE;
|
||||
startPendingStreams();
|
||||
}
|
||||
|
||||
Variant variant = new Http2();
|
||||
rawFrameWriter = variant.newWriter(sink, true);
|
||||
frameWriter.becomeConnected(rawFrameWriter, socket);
|
||||
|
||||
|
|
@ -383,17 +385,10 @@ class OkHttpClientTransport implements ManagedClientTransport {
|
|||
rawFrameWriter.connectionPreface();
|
||||
Settings settings = new Settings();
|
||||
rawFrameWriter.settings(settings);
|
||||
} catch (RuntimeException e) {
|
||||
onException(e);
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
onException(e);
|
||||
throw new RuntimeException(e);
|
||||
return;
|
||||
}
|
||||
|
||||
clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true));
|
||||
executor.execute(clientFrameHandler);
|
||||
startPendingStreams();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
@ -444,16 +439,14 @@ class OkHttpClientTransport implements ManagedClientTransport {
|
|||
@Override
|
||||
public void shutdown() {
|
||||
synchronized (lock) {
|
||||
if (goAway) {
|
||||
if (goAwayStatus != null) {
|
||||
return;
|
||||
}
|
||||
|
||||
goAwayStatus = Status.UNAVAILABLE.withDescription("Transport stopped");
|
||||
listener.transportShutdown(goAwayStatus);
|
||||
stopIfNecessary();
|
||||
}
|
||||
|
||||
// Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated streams.
|
||||
// The GOAWAY is part of graceful shutdown.
|
||||
frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]);
|
||||
|
||||
startGoAway(Integer.MAX_VALUE, Status.UNAVAILABLE.withDescription("Transport stopped"));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -480,40 +473,34 @@ class OkHttpClientTransport implements ManagedClientTransport {
|
|||
/**
|
||||
* Finish all active streams due to an IOException, then close the transport.
|
||||
*/
|
||||
void onException(Throwable cause) {
|
||||
log.log(Level.WARNING, "Transport failed", cause);
|
||||
Status status = Status.UNAVAILABLE.withCause(cause);
|
||||
if (cause != null) {
|
||||
void onException(Throwable failureCause) {
|
||||
Status status = Status.UNAVAILABLE.withCause(failureCause);
|
||||
if (failureCause != null) {
|
||||
status = status.augmentDescription("No provided cause");
|
||||
}
|
||||
startGoAway(0, status);
|
||||
startGoAway(0, ErrorCode.INTERNAL_ERROR, status);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send GOAWAY to the server, then finish all active streams and close the transport.
|
||||
*/
|
||||
private void onError(ErrorCode errorCode, String moreDetail) {
|
||||
frameWriter.goAway(0, errorCode, new byte[0]);
|
||||
startGoAway(0, toGrpcStatus(errorCode).augmentDescription(moreDetail));
|
||||
startGoAway(0, errorCode, toGrpcStatus(errorCode).augmentDescription(moreDetail));
|
||||
}
|
||||
|
||||
private void startGoAway(int lastKnownStreamId, Status status) {
|
||||
private void startGoAway(int lastKnownStreamId, ErrorCode errorCode, Status status) {
|
||||
synchronized (lock) {
|
||||
if (startedGoAway) {
|
||||
// Another go-away is in progress, ignore this one.
|
||||
return;
|
||||
if (goAwayStatus == null) {
|
||||
goAwayStatus = status;
|
||||
listener.transportShutdown(status);
|
||||
}
|
||||
if (errorCode != null && !goAwaySent) {
|
||||
// Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated
|
||||
// streams. The GOAWAY is part of graceful shutdown.
|
||||
goAwaySent = true;
|
||||
frameWriter.goAway(0, errorCode, new byte[0]);
|
||||
}
|
||||
// We use startedGoAway here instead of goAway, because once the goAway becomes true, other
|
||||
// thread in stopIfNecessary() may stop the transport and cause the
|
||||
// listener.transportTerminated() be called before listener.transportShutdown().
|
||||
startedGoAway = true;
|
||||
}
|
||||
|
||||
listener.transportShutdown(status);
|
||||
|
||||
synchronized (lock) {
|
||||
goAway = true;
|
||||
goAwayStatus = status;
|
||||
Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
|
||||
while (it.hasNext()) {
|
||||
Map.Entry<Integer, OkHttpClientStream> entry = it.next();
|
||||
|
|
@ -527,9 +514,9 @@ class OkHttpClientTransport implements ManagedClientTransport {
|
|||
stream.transportReportStatus(status, true, new Metadata());
|
||||
}
|
||||
pendingStreams.clear();
|
||||
}
|
||||
|
||||
stopIfNecessary();
|
||||
stopIfNecessary();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -567,22 +554,31 @@ class OkHttpClientTransport implements ManagedClientTransport {
|
|||
/**
|
||||
* When the transport is in goAway state, we should stop it once all active streams finish.
|
||||
*/
|
||||
@GuardedBy("lock")
|
||||
void stopIfNecessary() {
|
||||
synchronized (lock) {
|
||||
if (goAway && streams.size() == 0) {
|
||||
if (!stopped) {
|
||||
stopped = true;
|
||||
// We will close the underlying socket in the writing thread to break out the reader
|
||||
// thread, which will close the frameReader and notify the listener.
|
||||
frameWriter.close();
|
||||
|
||||
if (ping != null) {
|
||||
ping.failed(getPingFailure());
|
||||
ping = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!(goAwayStatus != null && streams.isEmpty() && pendingStreams.isEmpty())) {
|
||||
return;
|
||||
}
|
||||
if (stopped) {
|
||||
return;
|
||||
}
|
||||
stopped = true;
|
||||
|
||||
if (ping != null) {
|
||||
ping.failed(getPingFailure());
|
||||
ping = null;
|
||||
}
|
||||
|
||||
if (!goAwaySent) {
|
||||
// Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated
|
||||
// streams. The GOAWAY is part of graceful shutdown.
|
||||
goAwaySent = true;
|
||||
frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]);
|
||||
}
|
||||
|
||||
// We will close the underlying socket in the writing thread to break out the reader
|
||||
// thread, which will close the frameReader and notify the listener.
|
||||
frameWriter.close();
|
||||
}
|
||||
|
||||
private Throwable getPingFailure() {
|
||||
|
|
@ -640,14 +636,11 @@ class OkHttpClientTransport implements ManagedClientTransport {
|
|||
// frameReader.nextFrame() returns false when the underlying read encounters an IOException,
|
||||
// it may be triggered by the socket closing, in such case, the startGoAway() will do
|
||||
// nothing, otherwise, we finish all streams since it's a real IO issue.
|
||||
// We don't call onException() here since we don't want to log the warning in case this is
|
||||
// triggered by socket closing.
|
||||
startGoAway(0,
|
||||
startGoAway(0, ErrorCode.INTERNAL_ERROR,
|
||||
Status.UNAVAILABLE.withDescription("End of stream or IOException"));
|
||||
} catch (Exception t) {
|
||||
// TODO(madongfly): Send the exception message to the server.
|
||||
frameWriter.goAway(0, ErrorCode.PROTOCOL_ERROR, new byte[0]);
|
||||
onException(t);
|
||||
startGoAway(0, ErrorCode.PROTOCOL_ERROR, Status.UNAVAILABLE.withCause(t));
|
||||
} finally {
|
||||
try {
|
||||
frameReader.close();
|
||||
|
|
@ -746,6 +739,7 @@ class OkHttpClientTransport implements ManagedClientTransport {
|
|||
listener.transportReady();
|
||||
firstSettings = false;
|
||||
}
|
||||
startPendingStreams();
|
||||
}
|
||||
|
||||
frameWriter.ackSettings(settings);
|
||||
|
|
@ -791,7 +785,7 @@ class OkHttpClientTransport implements ManagedClientTransport {
|
|||
// If a debug message was provided, use it.
|
||||
status = status.augmentDescription(debugData.utf8());
|
||||
}
|
||||
startGoAway(lastGoodStreamId, status);
|
||||
startGoAway(lastGoodStreamId, null, status);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Copyright 2016, Google Inc. All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are
|
||||
* met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright
|
||||
* notice, this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the following disclaimer
|
||||
* in the documentation and/or other materials provided with the
|
||||
* distribution.
|
||||
*
|
||||
* * Neither the name of Google Inc. nor the names of its
|
||||
* contributors may be used to endorse or promote products derived from
|
||||
* this software without specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
package io.grpc.internal;
|
||||
|
||||
/** A hack to access protected methods from io.grpc.internal. */
|
||||
public final class AccessProtectedHack {
|
||||
public static InternalServer serverBuilderBuildTransportServer(
|
||||
AbstractServerImplBuilder<?> builder) {
|
||||
return builder.buildTransportServer();
|
||||
}
|
||||
|
||||
private AccessProtectedHack() {}
|
||||
}
|
||||
|
|
@ -33,6 +33,7 @@ package io.grpc.okhttp;
|
|||
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import io.grpc.ManagedChannelProvider;
|
||||
|
||||
|
|
@ -40,6 +41,8 @@ import org.junit.Test;
|
|||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
|
||||
import java.util.ServiceLoader;
|
||||
|
||||
/** Unit tests for {@link OkHttpChannelProvider}. */
|
||||
@RunWith(JUnit4.class)
|
||||
public class OkHttpChannelProviderTest {
|
||||
|
|
@ -47,7 +50,12 @@ public class OkHttpChannelProviderTest {
|
|||
|
||||
@Test
|
||||
public void provided() {
|
||||
assertSame(OkHttpChannelProvider.class, ManagedChannelProvider.provider().getClass());
|
||||
for (ManagedChannelProvider current : ServiceLoader.load(ManagedChannelProvider.class)) {
|
||||
if (current instanceof OkHttpChannelProvider) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
fail("ServiceLoader unable to load OkHttpChannelProvider");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
|
|
@ -664,7 +664,6 @@ public class OkHttpClientTransportTest {
|
|||
stream2.start(listener2);
|
||||
assertEquals(2, activeStreamCount());
|
||||
clientTransport.shutdown();
|
||||
verify(frameWriter, timeout(TIME_OUT_MS)).goAway(eq(0), eq(ErrorCode.NO_ERROR), (byte[]) any());
|
||||
|
||||
assertEquals(2, activeStreamCount());
|
||||
verify(transportListener).transportShutdown(isA(Status.class));
|
||||
|
|
@ -676,6 +675,7 @@ public class OkHttpClientTransportTest {
|
|||
assertEquals(0, activeStreamCount());
|
||||
assertEquals(Status.CANCELLED.getCode(), listener1.status.getCode());
|
||||
assertEquals(Status.CANCELLED.getCode(), listener2.status.getCode());
|
||||
verify(frameWriter, timeout(TIME_OUT_MS)).goAway(eq(0), eq(ErrorCode.NO_ERROR), (byte[]) any());
|
||||
verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
|
||||
shutdownAndVerify();
|
||||
}
|
||||
|
|
@ -857,7 +857,7 @@ public class OkHttpClientTransportTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void pendingStreamFailedByShutdown() throws Exception {
|
||||
public void pendingStreamSucceedAfterShutdown() throws Exception {
|
||||
initTransport();
|
||||
setMaxConcurrentStreams(0);
|
||||
final MockStreamListener listener = new MockStreamListener();
|
||||
|
|
@ -867,10 +867,11 @@ public class OkHttpClientTransportTest {
|
|||
waitForStreamPending(1);
|
||||
|
||||
clientTransport.shutdown();
|
||||
|
||||
listener.waitUntilStreamClosed();
|
||||
assertEquals(Status.UNAVAILABLE.getCode(), listener.status.getCode());
|
||||
assertEquals(0, clientTransport.getPendingStreamSize());
|
||||
setMaxConcurrentStreams(1);
|
||||
verify(frameWriter, timeout(TIME_OUT_MS))
|
||||
.synStream(anyBoolean(), anyBoolean(), eq(3), anyInt(), anyListHeader());
|
||||
assertEquals(1, activeStreamCount());
|
||||
stream.sendCancel(Status.CANCELLED);
|
||||
shutdownAndVerify();
|
||||
}
|
||||
|
||||
|
|
@ -901,7 +902,7 @@ public class OkHttpClientTransportTest {
|
|||
stream1.cancel(Status.CANCELLED);
|
||||
listener1.waitUntilStreamClosed();
|
||||
listener3.waitUntilStreamClosed();
|
||||
assertEquals(Status.INTERNAL.getCode(), listener3.status.getCode());
|
||||
assertEquals(Status.UNAVAILABLE.getCode(), listener3.status.getCode());
|
||||
assertEquals(0, clientTransport.getPendingStreamSize());
|
||||
assertEquals(1, activeStreamCount());
|
||||
stream2 = getStream(startId + 2);
|
||||
|
|
@ -1294,10 +1295,14 @@ public class OkHttpClientTransportTest {
|
|||
clientTransport.shutdown();
|
||||
allowTransportConnected();
|
||||
|
||||
// The new stream should be failed, as well as the pending stream.
|
||||
// The new stream should be failed, but not the pending stream.
|
||||
assertNewStreamFail();
|
||||
verify(frameWriter, timeout(TIME_OUT_MS))
|
||||
.synStream(anyBoolean(), anyBoolean(), eq(3), anyInt(), anyListHeader());
|
||||
assertEquals(1, activeStreamCount());
|
||||
stream.cancel(Status.CANCELLED);
|
||||
listener.waitUntilStreamClosed();
|
||||
assertEquals(Status.UNAVAILABLE.getCode(), listener.status.getCode());
|
||||
assertEquals(Status.CANCELLED.getCode(), listener.status.getCode());
|
||||
shutdownAndVerify();
|
||||
}
|
||||
|
||||
|
|
@ -1414,6 +1419,10 @@ public class OkHttpClientTransportTest {
|
|||
CONTENT_TYPE_HEADER);
|
||||
}
|
||||
|
||||
private static List<Header> anyListHeader() {
|
||||
return any();
|
||||
}
|
||||
|
||||
private static class MockFrameReader implements FrameReader {
|
||||
CountDownLatch closed = new CountDownLatch(1);
|
||||
boolean throwExceptionForNextFrame;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,84 @@
|
|||
/*
|
||||
* Copyright 2016, Google Inc. All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are
|
||||
* met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright
|
||||
* notice, this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the following disclaimer
|
||||
* in the documentation and/or other materials provided with the
|
||||
* distribution.
|
||||
*
|
||||
* * Neither the name of Google Inc. nor the names of its
|
||||
* contributors may be used to endorse or promote products derived from
|
||||
* this software without specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
package io.grpc.okhttp;
|
||||
|
||||
import io.grpc.internal.AccessProtectedHack;
|
||||
import io.grpc.internal.ClientTransportFactory;
|
||||
import io.grpc.internal.InternalServer;
|
||||
import io.grpc.internal.ManagedClientTransport;
|
||||
import io.grpc.internal.testing.AbstractTransportTest;
|
||||
import io.grpc.netty.NettyServerBuilder;
|
||||
import io.grpc.testing.TestUtils;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
/** Unit tests for OkHttp transport. */
|
||||
@RunWith(JUnit4.class)
|
||||
public class OkHttpTransportTest extends AbstractTransportTest {
|
||||
private static final int SERVER_PORT = TestUtils.pickUnusedPort();
|
||||
private ClientTransportFactory clientFactory = OkHttpChannelBuilder
|
||||
// Although specified here, address is ignored because we never call build.
|
||||
.forAddress("127.0.0.1", SERVER_PORT)
|
||||
.negotiationType(NegotiationType.PLAINTEXT)
|
||||
.buildTransportFactory();
|
||||
|
||||
@After
|
||||
public void releaseClientFactory() {
|
||||
clientFactory.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InternalServer newServer() {
|
||||
return AccessProtectedHack.serverBuilderBuildTransportServer(
|
||||
NettyServerBuilder
|
||||
.forPort(SERVER_PORT)
|
||||
.flowControlWindow(65 * 1024));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ManagedClientTransport newClientTransport() {
|
||||
return clientFactory.newClientTransport(
|
||||
new InetSocketAddress("127.0.0.1", SERVER_PORT), "127.0.0.1:" + SERVER_PORT);
|
||||
}
|
||||
|
||||
// TODO(ejona): Flaky/Broken
|
||||
@Test
|
||||
@Ignore
|
||||
@Override
|
||||
public void flowControlPushBack() {}
|
||||
}
|
||||
Loading…
Reference in New Issue