mirror of https://github.com/grpc/grpc-java.git
okhttp: settings acks back after apply settings before sending any data (#4825)
okhttp: setting acks back after apply settings before sending any data as a result of the change. Resolves #4809 also, make #4816 the not flaky.
This commit is contained in:
parent
2fca42feb9
commit
554210da2a
|
|
@ -1070,6 +1070,7 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep
|
|||
|
||||
@Override
|
||||
public void settings(boolean clearPrevious, Settings settings) {
|
||||
boolean outboundWindowSizeIncreased = false;
|
||||
synchronized (lock) {
|
||||
if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS)) {
|
||||
int receivedMaxConcurrentStreams = OkHttpSettingsUtil.get(
|
||||
|
|
@ -1080,16 +1081,24 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep
|
|||
if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE)) {
|
||||
int initialWindowSize = OkHttpSettingsUtil.get(
|
||||
settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE);
|
||||
outboundFlow.initialOutboundWindowSize(initialWindowSize);
|
||||
outboundWindowSizeIncreased = outboundFlow.initialOutboundWindowSize(initialWindowSize);
|
||||
}
|
||||
if (firstSettings) {
|
||||
listener.transportReady();
|
||||
firstSettings = false;
|
||||
}
|
||||
|
||||
// The changed settings are not finalized until SETTINGS acknowledgment frame is sent. Any
|
||||
// writes due to update in settings must be sent after SETTINGS acknowledgment frame,
|
||||
// otherwise it will cause a stream error (RST_STREAM).
|
||||
frameWriter.ackSettings(settings);
|
||||
|
||||
// send any pending bytes / streams
|
||||
if (outboundWindowSizeIncreased) {
|
||||
outboundFlow.writeStreams();
|
||||
}
|
||||
startPendingStreams();
|
||||
}
|
||||
|
||||
frameWriter.ackSettings(settings);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -46,9 +46,15 @@ class OutboundFlowController {
|
|||
}
|
||||
|
||||
/**
|
||||
* Must be called with holding transport lock.
|
||||
* Adjusts outbound window size requested by peer. When window size is increased, it does not send
|
||||
* any pending frames. If this method returns {@code true}, the caller should call {@link
|
||||
* #writeStreams()} after settings ack.
|
||||
*
|
||||
* <p>Must be called with holding transport lock.
|
||||
*
|
||||
* @return true, if new window size is increased, false otherwise.
|
||||
*/
|
||||
void initialOutboundWindowSize(int newWindowSize) {
|
||||
boolean initialOutboundWindowSize(int newWindowSize) {
|
||||
if (newWindowSize < 0) {
|
||||
throw new IllegalArgumentException("Invalid initial window size: " + newWindowSize);
|
||||
}
|
||||
|
|
@ -66,10 +72,7 @@ class OutboundFlowController {
|
|||
}
|
||||
}
|
||||
|
||||
if (delta > 0) {
|
||||
// The window size increased, send any pending frames for all streams.
|
||||
writeStreams();
|
||||
}
|
||||
return delta > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -163,8 +166,10 @@ class OutboundFlowController {
|
|||
|
||||
/**
|
||||
* Writes as much data for all the streams as possible given the current flow control windows.
|
||||
*
|
||||
* <p>Must be called with holding transport lock.
|
||||
*/
|
||||
private void writeStreams() {
|
||||
void writeStreams() {
|
||||
OkHttpClientStream[] streams = transport.getActiveStreams();
|
||||
int connectionWindow = connectionState.window();
|
||||
for (int numStreams = streams.length; numStreams > 0 && connectionWindow > 0;) {
|
||||
|
|
|
|||
|
|
@ -38,6 +38,7 @@ import static org.mockito.Matchers.anyInt;
|
|||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Matchers.isA;
|
||||
import static org.mockito.Matchers.same;
|
||||
import static org.mockito.Mockito.inOrder;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.reset;
|
||||
|
|
@ -113,6 +114,7 @@ import org.junit.rules.Timeout;
|
|||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.InOrder;
|
||||
import org.mockito.Matchers;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
|
|
@ -758,6 +760,40 @@ public class OkHttpClientTransportTest {
|
|||
shutdownAndVerify();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void outboundFlowControlWithInitialWindowSizeChangeInMiddleOfStream() throws Exception {
|
||||
initTransport();
|
||||
MockStreamListener listener = new MockStreamListener();
|
||||
OkHttpClientStream stream =
|
||||
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
|
||||
stream.start(listener);
|
||||
int messageLength = 20;
|
||||
setInitialWindowSize(HEADER_LENGTH + 10);
|
||||
InputStream input = new ByteArrayInputStream(new byte[messageLength]);
|
||||
stream.writeMessage(input);
|
||||
stream.flush();
|
||||
// part of the message can be sent.
|
||||
verify(frameWriter, timeout(TIME_OUT_MS))
|
||||
.data(eq(false), eq(3), any(Buffer.class), eq(HEADER_LENGTH + 10));
|
||||
// Avoid connection flow control.
|
||||
frameHandler().windowUpdate(0, HEADER_LENGTH + 20);
|
||||
|
||||
// Increase initial window size
|
||||
setInitialWindowSize(HEADER_LENGTH + 20);
|
||||
|
||||
// wait until pending frames sent (inOrder doesn't support timeout)
|
||||
verify(frameWriter, timeout(TIME_OUT_MS).atLeastOnce())
|
||||
.data(eq(false), eq(3), any(Buffer.class), eq(10));
|
||||
// It should ack the settings, then send remaining message.
|
||||
InOrder inOrder = inOrder(frameWriter);
|
||||
inOrder.verify(frameWriter).ackSettings(any(Settings.class));
|
||||
inOrder.verify(frameWriter).data(eq(false), eq(3), any(Buffer.class), eq(10));
|
||||
|
||||
stream.cancel(Status.CANCELLED);
|
||||
listener.waitUntilStreamClosed();
|
||||
shutdownAndVerify();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void stopNormally() throws Exception {
|
||||
initTransport();
|
||||
|
|
|
|||
Loading…
Reference in New Issue