okhttp: make flow control window size configurable (#4959)

Make flow control window size in OkHttp configurable.
This commit is contained in:
creamsoup 2018-10-17 17:33:23 -07:00 committed by GitHub
parent e6e3eb8420
commit 7675ce2d47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 126 additions and 53 deletions

View File

@ -61,6 +61,8 @@ import javax.net.ssl.TrustManagerFactory;
public class OkHttpChannelBuilder extends public class OkHttpChannelBuilder extends
AbstractManagedChannelImplBuilder<OkHttpChannelBuilder> { AbstractManagedChannelImplBuilder<OkHttpChannelBuilder> {
public static final int DEFAULT_FLOW_CONTROL_WINDOW = 65535;
/** Identifies the negotiation used for starting up HTTP/2. */ /** Identifies the negotiation used for starting up HTTP/2. */
private enum NegotiationType { private enum NegotiationType {
/** Uses TLS ALPN/NPN negotiation, assumes an SSL connection. */ /** Uses TLS ALPN/NPN negotiation, assumes an SSL connection. */
@ -157,6 +159,7 @@ public class OkHttpChannelBuilder extends
private NegotiationType negotiationType = NegotiationType.TLS; private NegotiationType negotiationType = NegotiationType.TLS;
private long keepAliveTimeNanos = KEEPALIVE_TIME_NANOS_DISABLED; private long keepAliveTimeNanos = KEEPALIVE_TIME_NANOS_DISABLED;
private long keepAliveTimeoutNanos = DEFAULT_KEEPALIVE_TIMEOUT_NANOS; private long keepAliveTimeoutNanos = DEFAULT_KEEPALIVE_TIMEOUT_NANOS;
private int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW;
private boolean keepAliveWithoutCalls; private boolean keepAliveWithoutCalls;
protected OkHttpChannelBuilder(String host, int port) { protected OkHttpChannelBuilder(String host, int port) {
@ -273,6 +276,16 @@ public class OkHttpChannelBuilder extends
return this; return this;
} }
/**
* Sets the flow control window in bytes. If not called, the default value
* is {@link #DEFAULT_FLOW_CONTROL_WINDOW}).
*/
public OkHttpChannelBuilder flowControlWindow(int flowControlWindow) {
Preconditions.checkState(flowControlWindow > 0, "flowControlWindow must be positive");
this.flowControlWindow = flowControlWindow;
return this;
}
/** /**
* {@inheritDoc} * {@inheritDoc}
* *
@ -398,8 +411,8 @@ public class OkHttpChannelBuilder extends
boolean enableKeepAlive = keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED; boolean enableKeepAlive = keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED;
return new OkHttpTransportFactory(transportExecutor, scheduledExecutorService, return new OkHttpTransportFactory(transportExecutor, scheduledExecutorService,
createSocketFactory(), hostnameVerifier, connectionSpec, maxInboundMessageSize(), createSocketFactory(), hostnameVerifier, connectionSpec, maxInboundMessageSize(),
enableKeepAlive, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls, enableKeepAlive, keepAliveTimeNanos, keepAliveTimeoutNanos, flowControlWindow,
transportTracerFactory); keepAliveWithoutCalls, transportTracerFactory);
} }
@Override @Override
@ -476,6 +489,7 @@ public class OkHttpChannelBuilder extends
private final boolean enableKeepAlive; private final boolean enableKeepAlive;
private final AtomicBackoff keepAliveTimeNanos; private final AtomicBackoff keepAliveTimeNanos;
private final long keepAliveTimeoutNanos; private final long keepAliveTimeoutNanos;
private final int flowControlWindow;
private final boolean keepAliveWithoutCalls; private final boolean keepAliveWithoutCalls;
private final ScheduledExecutorService timeoutService; private final ScheduledExecutorService timeoutService;
private boolean closed; private boolean closed;
@ -489,6 +503,7 @@ public class OkHttpChannelBuilder extends
boolean enableKeepAlive, boolean enableKeepAlive,
long keepAliveTimeNanos, long keepAliveTimeNanos,
long keepAliveTimeoutNanos, long keepAliveTimeoutNanos,
int flowControlWindow,
boolean keepAliveWithoutCalls, boolean keepAliveWithoutCalls,
TransportTracer.Factory transportTracerFactory) { TransportTracer.Factory transportTracerFactory) {
usingSharedScheduler = timeoutService == null; usingSharedScheduler = timeoutService == null;
@ -501,6 +516,7 @@ public class OkHttpChannelBuilder extends
this.enableKeepAlive = enableKeepAlive; this.enableKeepAlive = enableKeepAlive;
this.keepAliveTimeNanos = new AtomicBackoff("keepalive time nanos", keepAliveTimeNanos); this.keepAliveTimeNanos = new AtomicBackoff("keepalive time nanos", keepAliveTimeNanos);
this.keepAliveTimeoutNanos = keepAliveTimeoutNanos; this.keepAliveTimeoutNanos = keepAliveTimeoutNanos;
this.flowControlWindow = flowControlWindow;
this.keepAliveWithoutCalls = keepAliveWithoutCalls; this.keepAliveWithoutCalls = keepAliveWithoutCalls;
usingSharedExecutor = executor == null; usingSharedExecutor = executor == null;
@ -537,6 +553,7 @@ public class OkHttpChannelBuilder extends
hostnameVerifier, hostnameVerifier,
connectionSpec, connectionSpec,
maxMessageSize, maxMessageSize,
flowControlWindow,
options.getProxyParameters(), options.getProxyParameters(),
tooManyPingsRunnable, tooManyPingsRunnable,
transportTracerFactory.create()); transportTracerFactory.create());

View File

@ -44,8 +44,6 @@ import okio.Buffer;
*/ */
class OkHttpClientStream extends AbstractClientStream { class OkHttpClientStream extends AbstractClientStream {
private static final int WINDOW_UPDATE_THRESHOLD = Utils.DEFAULT_WINDOW_SIZE / 2;
private static final Buffer EMPTY_BUFFER = new Buffer(); private static final Buffer EMPTY_BUFFER = new Buffer();
public static final int ABSENT_ID = -1; public static final int ABSENT_ID = -1;
@ -71,6 +69,7 @@ class OkHttpClientStream extends AbstractClientStream {
OutboundFlowController outboundFlow, OutboundFlowController outboundFlow,
Object lock, Object lock,
int maxMessageSize, int maxMessageSize,
int initialWindowSize,
String authority, String authority,
String userAgent, String userAgent,
StatsTraceContext statsTraceCtx, StatsTraceContext statsTraceCtx,
@ -91,8 +90,15 @@ class OkHttpClientStream extends AbstractClientStream {
// so it is safe to read the transport attributes. // so it is safe to read the transport attributes.
// We make a copy here for convenience, even though we can ask the transport. // We make a copy here for convenience, even though we can ask the transport.
this.attributes = transport.getAttributes(); this.attributes = transport.getAttributes();
this.state = new TransportState(maxMessageSize, statsTraceCtx, lock, frameWriter, outboundFlow, this.state =
transport); new TransportState(
maxMessageSize,
statsTraceCtx,
lock,
frameWriter,
outboundFlow,
transport,
initialWindowSize);
} }
@Override @Override
@ -184,6 +190,7 @@ class OkHttpClientStream extends AbstractClientStream {
} }
class TransportState extends Http2ClientStreamTransportState { class TransportState extends Http2ClientStreamTransportState {
private final int initialWindowSize;
private final Object lock; private final Object lock;
@GuardedBy("lock") @GuardedBy("lock")
private List<Header> requestHeaders; private List<Header> requestHeaders;
@ -196,9 +203,9 @@ class OkHttpClientStream extends AbstractClientStream {
@GuardedBy("lock") @GuardedBy("lock")
private boolean cancelSent = false; private boolean cancelSent = false;
@GuardedBy("lock") @GuardedBy("lock")
private int window = Utils.DEFAULT_WINDOW_SIZE; private int window;
@GuardedBy("lock") @GuardedBy("lock")
private int processedWindow = Utils.DEFAULT_WINDOW_SIZE; private int processedWindow;
@GuardedBy("lock") @GuardedBy("lock")
private final AsyncFrameWriter frameWriter; private final AsyncFrameWriter frameWriter;
@GuardedBy("lock") @GuardedBy("lock")
@ -212,12 +219,16 @@ class OkHttpClientStream extends AbstractClientStream {
Object lock, Object lock,
AsyncFrameWriter frameWriter, AsyncFrameWriter frameWriter,
OutboundFlowController outboundFlow, OutboundFlowController outboundFlow,
OkHttpClientTransport transport) { OkHttpClientTransport transport,
int initialWindowSize) {
super(maxMessageSize, statsTraceCtx, OkHttpClientStream.this.getTransportTracer()); super(maxMessageSize, statsTraceCtx, OkHttpClientStream.this.getTransportTracer());
this.lock = checkNotNull(lock, "lock"); this.lock = checkNotNull(lock, "lock");
this.frameWriter = frameWriter; this.frameWriter = frameWriter;
this.outboundFlow = outboundFlow; this.outboundFlow = outboundFlow;
this.transport = transport; this.transport = transport;
this.window = initialWindowSize;
this.processedWindow = initialWindowSize;
this.initialWindowSize = initialWindowSize;
} }
@GuardedBy("lock") @GuardedBy("lock")
@ -270,8 +281,8 @@ class OkHttpClientStream extends AbstractClientStream {
@GuardedBy("lock") @GuardedBy("lock")
public void bytesRead(int processedBytes) { public void bytesRead(int processedBytes) {
processedWindow -= processedBytes; processedWindow -= processedBytes;
if (processedWindow <= WINDOW_UPDATE_THRESHOLD) { if (processedWindow <= initialWindowSize * Utils.DEFAULT_WINDOW_UPDATE_RATIO) {
int delta = Utils.DEFAULT_WINDOW_SIZE - processedWindow; int delta = initialWindowSize - processedWindow;
window += delta; window += delta;
processedWindow += delta; processedWindow += delta;
frameWriter.windowUpdate(id(), delta); frameWriter.windowUpdate(id(), delta);

View File

@ -18,6 +18,7 @@ package io.grpc.okhttp;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import static io.grpc.internal.GrpcUtil.TIMER_SERVICE; import static io.grpc.internal.GrpcUtil.TIMER_SERVICE;
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_UPDATE_RATIO;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects;
@ -140,6 +141,7 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep
private final Random random = new Random(); private final Random random = new Random();
// Returns new unstarted stopwatches // Returns new unstarted stopwatches
private final Supplier<Stopwatch> stopwatchFactory; private final Supplier<Stopwatch> stopwatchFactory;
private final int initialWindowSize;
private Listener listener; private Listener listener;
private FrameReader testFrameReader; private FrameReader testFrameReader;
private AsyncFrameWriter frameWriter; private AsyncFrameWriter frameWriter;
@ -220,11 +222,12 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep
OkHttpClientTransport(InetSocketAddress address, String authority, @Nullable String userAgent, OkHttpClientTransport(InetSocketAddress address, String authority, @Nullable String userAgent,
Executor executor, @Nullable SSLSocketFactory sslSocketFactory, Executor executor, @Nullable SSLSocketFactory sslSocketFactory,
@Nullable HostnameVerifier hostnameVerifier, ConnectionSpec connectionSpec, @Nullable HostnameVerifier hostnameVerifier, ConnectionSpec connectionSpec,
int maxMessageSize, @Nullable ProxyParameters proxy, Runnable tooManyPingsRunnable, int maxMessageSize, int initialWindowSize, @Nullable ProxyParameters proxy,
TransportTracer transportTracer) { Runnable tooManyPingsRunnable, TransportTracer transportTracer) {
this.address = Preconditions.checkNotNull(address, "address"); this.address = Preconditions.checkNotNull(address, "address");
this.defaultAuthority = authority; this.defaultAuthority = authority;
this.maxMessageSize = maxMessageSize; this.maxMessageSize = maxMessageSize;
this.initialWindowSize = initialWindowSize;
this.executor = Preconditions.checkNotNull(executor, "executor"); this.executor = Preconditions.checkNotNull(executor, "executor");
serializingExecutor = new SerializingExecutor(executor); serializingExecutor = new SerializingExecutor(executor);
// Client initiated streams are odd, server initiated ones are even. Server should not need to // Client initiated streams are odd, server initiated ones are even. Server should not need to
@ -257,10 +260,12 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep
@Nullable Runnable connectingCallback, @Nullable Runnable connectingCallback,
SettableFuture<Void> connectedFuture, SettableFuture<Void> connectedFuture,
int maxMessageSize, int maxMessageSize,
int initialWindowSize,
Runnable tooManyPingsRunnable, Runnable tooManyPingsRunnable,
TransportTracer transportTracer) { TransportTracer transportTracer) {
address = null; address = null;
this.maxMessageSize = maxMessageSize; this.maxMessageSize = maxMessageSize;
this.initialWindowSize = initialWindowSize;
defaultAuthority = "notarealauthority:80"; defaultAuthority = "notarealauthority:80";
this.userAgent = GrpcUtil.getGrpcUserAgent("okhttp", userAgent); this.userAgent = GrpcUtil.getGrpcUserAgent("okhttp", userAgent);
this.executor = Preconditions.checkNotNull(executor, "executor"); this.executor = Preconditions.checkNotNull(executor, "executor");
@ -358,6 +363,7 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep
outboundFlow, outboundFlow,
lock, lock,
maxMessageSize, maxMessageSize,
initialWindowSize,
defaultAuthority, defaultAuthority,
userAgent, userAgent,
statsTraceCtx, statsTraceCtx,
@ -437,7 +443,7 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep
} }
frameWriter = new AsyncFrameWriter(this, serializingExecutor); frameWriter = new AsyncFrameWriter(this, serializingExecutor);
outboundFlow = new OutboundFlowController(this, frameWriter); outboundFlow = new OutboundFlowController(this, frameWriter, initialWindowSize);
// Connecting in the serializingExecutor, so that some stream operations like synStream // Connecting in the serializingExecutor, so that some stream operations like synStream
// will be executed after connected. // will be executed after connected.
serializingExecutor.execute(new Runnable() { serializingExecutor.execute(new Runnable() {
@ -1044,7 +1050,7 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep
// connection window update // connection window update
connectionUnacknowledgedBytesRead += length; connectionUnacknowledgedBytesRead += length;
if (connectionUnacknowledgedBytesRead >= Utils.DEFAULT_WINDOW_SIZE / 2) { if (connectionUnacknowledgedBytesRead >= initialWindowSize * DEFAULT_WINDOW_UPDATE_RATIO) {
frameWriter.windowUpdate(0, connectionUnacknowledgedBytesRead); frameWriter.windowUpdate(0, connectionUnacknowledgedBytesRead);
connectionUnacknowledgedBytesRead = 0; connectionUnacknowledgedBytesRead = 0;
} }

View File

@ -17,7 +17,6 @@
package io.grpc.okhttp; package io.grpc.okhttp;
import static io.grpc.okhttp.Utils.CONNECTION_STREAM_ID; import static io.grpc.okhttp.Utils.CONNECTION_STREAM_ID;
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE;
import static java.lang.Math.ceil; import static java.lang.Math.ceil;
import static java.lang.Math.max; import static java.lang.Math.max;
import static java.lang.Math.min; import static java.lang.Math.min;
@ -37,12 +36,15 @@ import okio.Buffer;
class OutboundFlowController { class OutboundFlowController {
private final OkHttpClientTransport transport; private final OkHttpClientTransport transport;
private final FrameWriter frameWriter; private final FrameWriter frameWriter;
private int initialWindowSize = DEFAULT_WINDOW_SIZE; private int initialWindowSize;
private final OutboundFlowState connectionState = new OutboundFlowState(CONNECTION_STREAM_ID); private final OutboundFlowState connectionState;
OutboundFlowController(OkHttpClientTransport transport, FrameWriter frameWriter) { OutboundFlowController(
OkHttpClientTransport transport, FrameWriter frameWriter, int initialWindowSize) {
this.transport = Preconditions.checkNotNull(transport, "transport"); this.transport = Preconditions.checkNotNull(transport, "transport");
this.frameWriter = Preconditions.checkNotNull(frameWriter, "frameWriter"); this.frameWriter = Preconditions.checkNotNull(frameWriter, "frameWriter");
this.initialWindowSize = initialWindowSize;
connectionState = new OutboundFlowState(CONNECTION_STREAM_ID, initialWindowSize);
} }
/** /**
@ -65,7 +67,7 @@ class OutboundFlowController {
OutboundFlowState state = (OutboundFlowState) stream.getOutboundFlowState(); OutboundFlowState state = (OutboundFlowState) stream.getOutboundFlowState();
if (state == null) { if (state == null) {
// Create the OutboundFlowState with the new window size. // Create the OutboundFlowState with the new window size.
state = new OutboundFlowState(stream); state = new OutboundFlowState(stream, initialWindowSize);
stream.setOutboundFlowState(state); stream.setOutboundFlowState(state);
} else { } else {
state.incrementStreamWindow(delta); state.incrementStreamWindow(delta);
@ -158,7 +160,7 @@ class OutboundFlowController {
private OutboundFlowState state(OkHttpClientStream stream) { private OutboundFlowState state(OkHttpClientStream stream) {
OutboundFlowState state = (OutboundFlowState) stream.getOutboundFlowState(); OutboundFlowState state = (OutboundFlowState) stream.getOutboundFlowState();
if (state == null) { if (state == null) {
state = new OutboundFlowState(stream); state = new OutboundFlowState(stream, initialWindowSize);
stream.setOutboundFlowState(state); stream.setOutboundFlowState(state);
} }
return state; return state;
@ -229,17 +231,18 @@ class OutboundFlowController {
final Queue<Frame> pendingWriteQueue; final Queue<Frame> pendingWriteQueue;
final int streamId; final int streamId;
int queuedBytes; int queuedBytes;
int window = initialWindowSize; int window;
int allocatedBytes; int allocatedBytes;
OkHttpClientStream stream; OkHttpClientStream stream;
OutboundFlowState(int streamId) { OutboundFlowState(int streamId, int initialWindowSize) {
this.streamId = streamId; this.streamId = streamId;
pendingWriteQueue = new ArrayDeque<Frame>(2); window = initialWindowSize;
pendingWriteQueue = new ArrayDeque<>(2);
} }
OutboundFlowState(OkHttpClientStream stream) { OutboundFlowState(OkHttpClientStream stream, int initialWindowSize) {
this(stream.id()); this(stream.id(), initialWindowSize);
this.stream = stream; this.stream = stream;
} }

View File

@ -36,7 +36,11 @@ import java.util.logging.Logger;
class Utils { class Utils {
private static final Logger log = Logger.getLogger(Utils.class.getName()); private static final Logger log = Logger.getLogger(Utils.class.getName());
static final int DEFAULT_WINDOW_SIZE = 65535; /**
* The default ratio of window size to initial window size below which a {@code WINDOW_UPDATE}
* is sent to expand the window.
*/
static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f;
static final int CONNECTION_STREAM_ID = 0; static final int CONNECTION_STREAM_ID = 0;
public static Metadata convertHeaders(List<Header> http2Headers) { public static Metadata convertHeaders(List<Header> http2Headers) {

View File

@ -57,6 +57,7 @@ import org.mockito.stubbing.Answer;
@RunWith(JUnit4.class) @RunWith(JUnit4.class)
public class OkHttpClientStreamTest { public class OkHttpClientStreamTest {
private static final int MAX_MESSAGE_SIZE = 100; private static final int MAX_MESSAGE_SIZE = 100;
private static final int INITIAL_WINDOW_SIZE = 65535;
@Mock private MethodDescriptor.Marshaller<Void> marshaller; @Mock private MethodDescriptor.Marshaller<Void> marshaller;
@Mock private AsyncFrameWriter frameWriter; @Mock private AsyncFrameWriter frameWriter;
@ -88,6 +89,7 @@ public class OkHttpClientStreamTest {
flowController, flowController,
lock, lock,
MAX_MESSAGE_SIZE, MAX_MESSAGE_SIZE,
INITIAL_WINDOW_SIZE,
"localhost", "localhost",
"userAgent", "userAgent",
StatsTraceContext.NOOP, StatsTraceContext.NOOP,
@ -150,8 +152,8 @@ public class OkHttpClientStreamTest {
Metadata metaData = new Metadata(); Metadata metaData = new Metadata();
metaData.put(GrpcUtil.USER_AGENT_KEY, "misbehaving-application"); metaData.put(GrpcUtil.USER_AGENT_KEY, "misbehaving-application");
stream = new OkHttpClientStream(methodDescriptor, metaData, frameWriter, transport, stream = new OkHttpClientStream(methodDescriptor, metaData, frameWriter, transport,
flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application", flowController, lock, MAX_MESSAGE_SIZE, INITIAL_WINDOW_SIZE, "localhost",
StatsTraceContext.NOOP, transportTracer, CallOptions.DEFAULT); "good-application", StatsTraceContext.NOOP, transportTracer, CallOptions.DEFAULT);
stream.start(new BaseClientStreamListener()); stream.start(new BaseClientStreamListener());
stream.transportState().start(3); stream.transportState().start(3);
@ -165,8 +167,8 @@ public class OkHttpClientStreamTest {
Metadata metaData = new Metadata(); Metadata metaData = new Metadata();
metaData.put(GrpcUtil.USER_AGENT_KEY, "misbehaving-application"); metaData.put(GrpcUtil.USER_AGENT_KEY, "misbehaving-application");
stream = new OkHttpClientStream(methodDescriptor, metaData, frameWriter, transport, stream = new OkHttpClientStream(methodDescriptor, metaData, frameWriter, transport,
flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application", flowController, lock, MAX_MESSAGE_SIZE, INITIAL_WINDOW_SIZE, "localhost",
StatsTraceContext.NOOP, transportTracer, CallOptions.DEFAULT); "good-application", StatsTraceContext.NOOP, transportTracer, CallOptions.DEFAULT);
stream.start(new BaseClientStreamListener()); stream.start(new BaseClientStreamListener());
stream.transportState().start(3); stream.transportState().start(3);
@ -193,8 +195,8 @@ public class OkHttpClientStreamTest {
.setResponseMarshaller(marshaller) .setResponseMarshaller(marshaller)
.build(); .build();
stream = new OkHttpClientStream(getMethod, new Metadata(), frameWriter, transport, stream = new OkHttpClientStream(getMethod, new Metadata(), frameWriter, transport,
flowController, lock, MAX_MESSAGE_SIZE, "localhost", "good-application", flowController, lock, MAX_MESSAGE_SIZE, INITIAL_WINDOW_SIZE, "localhost",
StatsTraceContext.NOOP, transportTracer, CallOptions.DEFAULT); "good-application", StatsTraceContext.NOOP, transportTracer, CallOptions.DEFAULT);
stream.start(new BaseClientStreamListener()); stream.start(new BaseClientStreamListener());
// GET streams send headers after halfClose is called. // GET streams send headers after halfClose is called.

View File

@ -125,6 +125,7 @@ import org.mockito.MockitoAnnotations;
@RunWith(JUnit4.class) @RunWith(JUnit4.class)
public class OkHttpClientTransportTest { public class OkHttpClientTransportTest {
private static final int TIME_OUT_MS = 2000; private static final int TIME_OUT_MS = 2000;
private static final int INITIAL_WINDOW_SIZE = 65535;
private static final String NETWORK_ISSUE_MESSAGE = "network issue"; private static final String NETWORK_ISSUE_MESSAGE = "network issue";
private static final String ERROR_MESSAGE = "simulated error"; private static final String ERROR_MESSAGE = "simulated error";
// The gRPC header length, which includes 1 byte compression flag and 4 bytes message length. // The gRPC header length, which includes 1 byte compression flag and 4 bytes message length.
@ -174,21 +175,28 @@ public class OkHttpClientTransportTest {
} }
private void initTransport() throws Exception { private void initTransport() throws Exception {
startTransport(DEFAULT_START_STREAM_ID, null, true, DEFAULT_MAX_MESSAGE_SIZE, null); startTransport(
DEFAULT_START_STREAM_ID, null, true, DEFAULT_MAX_MESSAGE_SIZE, INITIAL_WINDOW_SIZE, null);
} }
private void initTransport(int startId) throws Exception { private void initTransport(int startId) throws Exception {
startTransport(startId, null, true, DEFAULT_MAX_MESSAGE_SIZE, null); startTransport(startId, null, true, DEFAULT_MAX_MESSAGE_SIZE, INITIAL_WINDOW_SIZE, null);
} }
private void initTransportAndDelayConnected() throws Exception { private void initTransportAndDelayConnected() throws Exception {
delayConnectedCallback = new DelayConnectedCallback(); delayConnectedCallback = new DelayConnectedCallback();
startTransport( startTransport(
DEFAULT_START_STREAM_ID, delayConnectedCallback, false, DEFAULT_MAX_MESSAGE_SIZE, null); DEFAULT_START_STREAM_ID,
delayConnectedCallback,
false,
DEFAULT_MAX_MESSAGE_SIZE,
INITIAL_WINDOW_SIZE,
null);
} }
private void startTransport(int startId, @Nullable Runnable connectingCallback, private void startTransport(int startId, @Nullable Runnable connectingCallback,
boolean waitingForConnected, int maxMessageSize, String userAgent) throws Exception { boolean waitingForConnected, int maxMessageSize, int initialWindowSize, String userAgent)
throws Exception {
connectedFuture = SettableFuture.create(); connectedFuture = SettableFuture.create();
final Ticker ticker = new Ticker() { final Ticker ticker = new Ticker() {
@Override @Override
@ -213,6 +221,7 @@ public class OkHttpClientTransportTest {
connectingCallback, connectingCallback,
connectedFuture, connectedFuture,
maxMessageSize, maxMessageSize,
initialWindowSize,
tooManyPingsRunnable, tooManyPingsRunnable,
new TransportTracer()); new TransportTracer());
clientTransport.start(transportListener); clientTransport.start(transportListener);
@ -233,6 +242,7 @@ public class OkHttpClientTransportTest {
hostnameVerifier, hostnameVerifier,
OkHttpChannelBuilder.INTERNAL_DEFAULT_CONNECTION_SPEC, OkHttpChannelBuilder.INTERNAL_DEFAULT_CONNECTION_SPEC,
DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
INITIAL_WINDOW_SIZE,
NO_PROXY, NO_PROXY,
tooManyPingsRunnable, tooManyPingsRunnable,
transportTracer); transportTracer);
@ -244,7 +254,7 @@ public class OkHttpClientTransportTest {
@Test @Test
public void maxMessageSizeShouldBeEnforced() throws Exception { public void maxMessageSizeShouldBeEnforced() throws Exception {
// Allow the response payloads of up to 1 byte. // Allow the response payloads of up to 1 byte.
startTransport(3, null, true, 1, null); startTransport(3, null, true, 1, INITIAL_WINDOW_SIZE, null);
MockStreamListener listener = new MockStreamListener(); MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream = OkHttpClientStream stream =
@ -502,7 +512,7 @@ public class OkHttpClientTransportTest {
@Test @Test
public void overrideDefaultUserAgent() throws Exception { public void overrideDefaultUserAgent() throws Exception {
startTransport(3, null, true, DEFAULT_MAX_MESSAGE_SIZE, "fakeUserAgent"); startTransport(3, null, true, DEFAULT_MAX_MESSAGE_SIZE, INITIAL_WINDOW_SIZE, "fakeUserAgent");
MockStreamListener listener = new MockStreamListener(); MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream = OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
@ -557,7 +567,7 @@ public class OkHttpClientTransportTest {
public void transportTracer_windowSizeDefault() throws Exception { public void transportTracer_windowSizeDefault() throws Exception {
initTransport(); initTransport();
TransportStats stats = getTransportStats(clientTransport); TransportStats stats = getTransportStats(clientTransport);
assertEquals(Utils.DEFAULT_WINDOW_SIZE, stats.remoteFlowControlWindow); assertEquals(INITIAL_WINDOW_SIZE, stats.remoteFlowControlWindow);
// okhttp does not track local window sizes // okhttp does not track local window sizes
assertEquals(-1, stats.localFlowControlWindow); assertEquals(-1, stats.localFlowControlWindow);
} }
@ -566,13 +576,13 @@ public class OkHttpClientTransportTest {
public void transportTracer_windowSize_remote() throws Exception { public void transportTracer_windowSize_remote() throws Exception {
initTransport(); initTransport();
TransportStats before = getTransportStats(clientTransport); TransportStats before = getTransportStats(clientTransport);
assertEquals(Utils.DEFAULT_WINDOW_SIZE, before.remoteFlowControlWindow); assertEquals(INITIAL_WINDOW_SIZE, before.remoteFlowControlWindow);
// okhttp does not track local window sizes // okhttp does not track local window sizes
assertEquals(-1, before.localFlowControlWindow); assertEquals(-1, before.localFlowControlWindow);
frameHandler().windowUpdate(0, 1000); frameHandler().windowUpdate(0, 1000);
TransportStats after = getTransportStats(clientTransport); TransportStats after = getTransportStats(clientTransport);
assertEquals(Utils.DEFAULT_WINDOW_SIZE + 1000, after.remoteFlowControlWindow); assertEquals(INITIAL_WINDOW_SIZE + 1000, after.remoteFlowControlWindow);
// okhttp does not track local window sizes // okhttp does not track local window sizes
assertEquals(-1, after.localFlowControlWindow); assertEquals(-1, after.localFlowControlWindow);
} }
@ -598,7 +608,7 @@ public class OkHttpClientTransportTest {
frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
frameHandler().headers(false, false, 5, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); frameHandler().headers(false, false, 5, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
int messageLength = Utils.DEFAULT_WINDOW_SIZE / 4; int messageLength = INITIAL_WINDOW_SIZE / 4;
byte[] fakeMessage = new byte[messageLength]; byte[] fakeMessage = new byte[messageLength];
// Stream 1 receives a message // Stream 1 receives a message
@ -651,7 +661,7 @@ public class OkHttpClientTransportTest {
OkHttpClientStream stream = OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
stream.start(listener); stream.start(listener);
int messageLength = Utils.DEFAULT_WINDOW_SIZE / 2 + 1; int messageLength = INITIAL_WINDOW_SIZE / 2 + 1;
byte[] fakeMessage = new byte[messageLength]; byte[] fakeMessage = new byte[messageLength];
frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
@ -679,13 +689,18 @@ public class OkHttpClientTransportTest {
@Test @Test
public void outboundFlowControl() throws Exception { public void outboundFlowControl() throws Exception {
initTransport(); outboundFlowControl(INITIAL_WINDOW_SIZE);
}
private void outboundFlowControl(int windowSize) throws Exception {
startTransport(
DEFAULT_START_STREAM_ID, null, true, DEFAULT_MAX_MESSAGE_SIZE, windowSize, null);
MockStreamListener listener = new MockStreamListener(); MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream = OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
stream.start(listener); stream.start(listener);
// The first message should be sent out. // The first message should be sent out.
int messageLength = Utils.DEFAULT_WINDOW_SIZE / 2 + 1; int messageLength = windowSize / 2 + 1;
InputStream input = new ByteArrayInputStream(new byte[messageLength]); InputStream input = new ByteArrayInputStream(new byte[messageLength]);
stream.writeMessage(input); stream.writeMessage(input);
stream.flush(); stream.flush();
@ -698,13 +713,13 @@ public class OkHttpClientTransportTest {
stream.writeMessage(input); stream.writeMessage(input);
stream.flush(); stream.flush();
int partiallySentSize = int partiallySentSize =
Utils.DEFAULT_WINDOW_SIZE - messageLength - HEADER_LENGTH; windowSize - messageLength - HEADER_LENGTH;
verify(frameWriter, timeout(TIME_OUT_MS)) verify(frameWriter, timeout(TIME_OUT_MS))
.data(eq(false), eq(3), any(Buffer.class), eq(partiallySentSize)); .data(eq(false), eq(3), any(Buffer.class), eq(partiallySentSize));
// Get more credit, the rest data should be sent out. // Get more credit, the rest data should be sent out.
frameHandler().windowUpdate(3, Utils.DEFAULT_WINDOW_SIZE); frameHandler().windowUpdate(3, windowSize);
frameHandler().windowUpdate(0, Utils.DEFAULT_WINDOW_SIZE); frameHandler().windowUpdate(0, windowSize);
verify(frameWriter, timeout(TIME_OUT_MS)).data( verify(frameWriter, timeout(TIME_OUT_MS)).data(
eq(false), eq(3), any(Buffer.class), eq(false), eq(3), any(Buffer.class),
eq(messageLength + HEADER_LENGTH - partiallySentSize)); eq(messageLength + HEADER_LENGTH - partiallySentSize));
@ -714,6 +729,16 @@ public class OkHttpClientTransportTest {
shutdownAndVerify(); shutdownAndVerify();
} }
@Test
public void outboundFlowControl_smallWindowSize() throws Exception {
outboundFlowControl(100);
}
@Test
public void outboundFlowControl_bigWindowSize() throws Exception {
outboundFlowControl(INITIAL_WINDOW_SIZE * 2);
}
@Test @Test
public void outboundFlowControlWithInitialWindowSizeChange() throws Exception { public void outboundFlowControlWithInitialWindowSizeChange() throws Exception {
initTransport(); initTransport();
@ -1076,7 +1101,7 @@ public class OkHttpClientTransportTest {
frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
int messageLength = Utils.DEFAULT_WINDOW_SIZE + 1; int messageLength = INITIAL_WINDOW_SIZE + 1;
byte[] fakeMessage = new byte[messageLength]; byte[] fakeMessage = new byte[messageLength];
Buffer buffer = createMessageFrame(fakeMessage); Buffer buffer = createMessageFrame(fakeMessage);
int messageFrameLength = (int) buffer.size(); int messageFrameLength = (int) buffer.size();
@ -1209,13 +1234,13 @@ public class OkHttpClientTransportTest {
stream.cancel(Status.CANCELLED); stream.cancel(Status.CANCELLED);
Buffer buffer = createMessageFrame( Buffer buffer = createMessageFrame(
new byte[Utils.DEFAULT_WINDOW_SIZE / 2 + 1]); new byte[INITIAL_WINDOW_SIZE / 2 + 1]);
frameHandler().data(false, 3, buffer, (int) buffer.size()); frameHandler().data(false, 3, buffer, (int) buffer.size());
// Should still update the connection window even stream 3 is gone. // Should still update the connection window even stream 3 is gone.
verify(frameWriter, timeout(TIME_OUT_MS)).windowUpdate(0, verify(frameWriter, timeout(TIME_OUT_MS)).windowUpdate(0,
HEADER_LENGTH + Utils.DEFAULT_WINDOW_SIZE / 2 + 1); HEADER_LENGTH + INITIAL_WINDOW_SIZE / 2 + 1);
buffer = createMessageFrame( buffer = createMessageFrame(
new byte[Utils.DEFAULT_WINDOW_SIZE / 2 + 1]); new byte[INITIAL_WINDOW_SIZE / 2 + 1]);
// This should kill the connection, since we never created stream 5. // This should kill the connection, since we never created stream 5.
frameHandler().data(false, 5, buffer, (int) buffer.size()); frameHandler().data(false, 5, buffer, (int) buffer.size());
@ -1490,6 +1515,7 @@ public class OkHttpClientTransportTest {
hostnameVerifier, hostnameVerifier,
ConnectionSpec.CLEARTEXT, ConnectionSpec.CLEARTEXT,
DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
INITIAL_WINDOW_SIZE,
NO_PROXY, NO_PROXY,
tooManyPingsRunnable, tooManyPingsRunnable,
transportTracer); transportTracer);
@ -1512,6 +1538,7 @@ public class OkHttpClientTransportTest {
hostnameVerifier, hostnameVerifier,
ConnectionSpec.CLEARTEXT, ConnectionSpec.CLEARTEXT,
DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
INITIAL_WINDOW_SIZE,
NO_PROXY, NO_PROXY,
tooManyPingsRunnable, tooManyPingsRunnable,
new TransportTracer()); new TransportTracer());
@ -1542,6 +1569,7 @@ public class OkHttpClientTransportTest {
hostnameVerifier, hostnameVerifier,
ConnectionSpec.CLEARTEXT, ConnectionSpec.CLEARTEXT,
DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
INITIAL_WINDOW_SIZE,
new ProxyParameters( new ProxyParameters(
(InetSocketAddress) serverSocket.getLocalSocketAddress(), NO_USER, NO_PW), (InetSocketAddress) serverSocket.getLocalSocketAddress(), NO_USER, NO_PW),
tooManyPingsRunnable, tooManyPingsRunnable,
@ -1592,6 +1620,7 @@ public class OkHttpClientTransportTest {
hostnameVerifier, hostnameVerifier,
ConnectionSpec.CLEARTEXT, ConnectionSpec.CLEARTEXT,
DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
INITIAL_WINDOW_SIZE,
new ProxyParameters( new ProxyParameters(
(InetSocketAddress) serverSocket.getLocalSocketAddress(), NO_USER, NO_PW), (InetSocketAddress) serverSocket.getLocalSocketAddress(), NO_USER, NO_PW),
tooManyPingsRunnable, tooManyPingsRunnable,
@ -1641,6 +1670,7 @@ public class OkHttpClientTransportTest {
hostnameVerifier, hostnameVerifier,
ConnectionSpec.CLEARTEXT, ConnectionSpec.CLEARTEXT,
DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
INITIAL_WINDOW_SIZE,
new ProxyParameters( new ProxyParameters(
(InetSocketAddress) serverSocket.getLocalSocketAddress(), NO_USER, NO_PW), (InetSocketAddress) serverSocket.getLocalSocketAddress(), NO_USER, NO_PW),
tooManyPingsRunnable, tooManyPingsRunnable,