From 3cbd948bad335228c8d9d985d45a2fa80c9bfc29 Mon Sep 17 00:00:00 2001 From: Larry Safran <107004254+larry-safran@users.noreply.github.com> Date: Thu, 19 Jan 2023 02:27:56 +0000 Subject: [PATCH] Restore "netty:Auto adjust BDP ping frequency" with fix (#9847) * Revert "Revert "netty:Auto adjust BDP ping frequency (#9650)" (#9821)" This reverts commit a2bbe8419891891fcfdb1b7215574f96db8057c5. * Eliminate half RTT delay in sending BDP Pings when starting up. * Eliminate delay for bdp ping when current read would push us over the threshold. --- .../io/grpc/netty/AbstractNettyHandler.java | 65 +++++--- .../io/grpc/netty/NettyChannelBuilder.java | 3 +- .../io/grpc/netty/NettyClientHandler.java | 18 ++- .../io/grpc/netty/NettyClientTransport.java | 9 +- .../io/grpc/netty/NettyServerHandler.java | 19 ++- .../io/grpc/netty/NettyClientHandlerTest.java | 83 ++++++---- .../grpc/netty/NettyClientTransportTest.java | 8 +- .../io/grpc/netty/NettyHandlerTestBase.java | 143 ++++++++++++++++-- .../io/grpc/netty/NettyServerHandlerTest.java | 34 +++-- 9 files changed, 296 insertions(+), 86 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java b/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java index c94c05ffaf..7f088509c0 100644 --- a/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java +++ b/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java @@ -16,10 +16,12 @@ package io.grpc.netty; +import static com.google.common.base.Preconditions.checkNotNull; import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Ticker; import io.grpc.ChannelLogger; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; @@ -44,6 +46,7 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler { private boolean autoTuneFlowControlOn; private ChannelHandlerContext ctx; private boolean initialWindowSent = false; + private final Ticker ticker; private static final long BDP_MEASUREMENT_PING = 1234; @@ -54,7 +57,8 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler { Http2Settings initialSettings, ChannelLogger negotiationLogger, boolean autoFlowControl, - PingLimiter pingLimiter) { + PingLimiter pingLimiter, + Ticker ticker) { super(channelUnused, decoder, encoder, initialSettings, negotiationLogger); // During a graceful shutdown, wait until all streams are closed. @@ -62,12 +66,13 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler { // Extract the connection window from the settings if it was set. this.initialConnectionWindow = initialSettings.initialWindowSize() == null ? -1 : - initialSettings.initialWindowSize(); + initialSettings.initialWindowSize(); this.autoTuneFlowControlOn = autoFlowControl; if (pingLimiter == null) { pingLimiter = new AllowPingLimiter(); } this.flowControlPing = new FlowControlPinger(pingLimiter); + this.ticker = checkNotNull(ticker, "ticker"); } @Override @@ -131,14 +136,17 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler { final class FlowControlPinger { private static final int MAX_WINDOW_SIZE = 8 * 1024 * 1024; + public static final int MAX_BACKOFF = 10; private final PingLimiter pingLimiter; private int pingCount; private int pingReturn; private boolean pinging; private int dataSizeSincePing; - private float lastBandwidth; // bytes per second + private long lastBandwidth; // bytes per nanosecond private long lastPingTime; + private int lastTargetWindow; + private int pingFrequencyMultiplier; public FlowControlPinger(PingLimiter pingLimiter) { Preconditions.checkNotNull(pingLimiter, "pingLimiter"); @@ -157,10 +165,24 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler { if (!autoTuneFlowControlOn) { return; } - if (!isPinging() && pingLimiter.isPingAllowed()) { + + // Note that we are double counting around the ping initiation as the current data will be + // added at the end of this method, so will be available in the next check. This at worst + // causes us to send a ping one data packet earlier, but makes startup faster if there are + // small packets before big ones. + int dataForCheck = getDataSincePing() + dataLength + paddingLength; + // Need to double the data here to account for targetWindow being set to twice the data below + if (!isPinging() && pingLimiter.isPingAllowed() + && dataForCheck * 2 >= lastTargetWindow * pingFrequencyMultiplier) { setPinging(true); sendPing(ctx()); } + + if (lastTargetWindow == 0) { + lastTargetWindow = + decoder().flowController().initialWindowSize(connection().connectionStream()); + } + incrementDataSincePing(dataLength + paddingLength); } @@ -169,25 +191,32 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler { return; } pingReturn++; - long elapsedTime = (System.nanoTime() - lastPingTime); + setPinging(false); + + long elapsedTime = (ticker.read() - lastPingTime); if (elapsedTime == 0) { elapsedTime = 1; } + long bandwidth = (getDataSincePing() * TimeUnit.SECONDS.toNanos(1)) / elapsedTime; - Http2LocalFlowController fc = decoder().flowController(); // Calculate new window size by doubling the observed BDP, but cap at max window int targetWindow = Math.min(getDataSincePing() * 2, MAX_WINDOW_SIZE); - setPinging(false); + Http2LocalFlowController fc = decoder().flowController(); int currentWindow = fc.initialWindowSize(connection().connectionStream()); - if (targetWindow > currentWindow && bandwidth > lastBandwidth) { - lastBandwidth = bandwidth; - int increase = targetWindow - currentWindow; - fc.incrementWindowSize(connection().connectionStream(), increase); - fc.initialWindowSize(targetWindow); - Http2Settings settings = new Http2Settings(); - settings.initialWindowSize(targetWindow); - frameWriter().writeSettings(ctx(), settings, ctx().newPromise()); + if (bandwidth <= lastBandwidth || targetWindow <= currentWindow) { + pingFrequencyMultiplier = Math.min(pingFrequencyMultiplier + 1, MAX_BACKOFF); + return; } + + pingFrequencyMultiplier = 0; // react quickly when size is changing + lastBandwidth = bandwidth; + lastTargetWindow = targetWindow; + int increase = targetWindow - currentWindow; + fc.incrementWindowSize(connection().connectionStream(), increase); + fc.initialWindowSize(targetWindow); + Http2Settings settings = new Http2Settings(); + settings.initialWindowSize(targetWindow); + frameWriter().writeSettings(ctx(), settings, ctx().newPromise()); } private boolean isPinging() { @@ -200,7 +229,7 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler { private void sendPing(ChannelHandlerContext ctx) { setDataSizeSincePing(0); - lastPingTime = System.nanoTime(); + lastPingTime = ticker.read(); encoder().writePing(ctx, false, BDP_MEASUREMENT_PING, ctx.newPromise()); pingCount++; } @@ -229,10 +258,12 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler { dataSizeSincePing = dataSize; } + // Only used in testing @VisibleForTesting void setDataSizeAndSincePing(int dataSize) { setDataSizeSincePing(dataSize); - lastPingTime = System.nanoTime() - TimeUnit.SECONDS.toNanos(1); + pingFrequencyMultiplier = 1; + lastPingTime = ticker.read() ; } } diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java index 0b87f8ea11..da7fe84d9c 100644 --- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java @@ -23,6 +23,7 @@ import static io.grpc.internal.GrpcUtil.DEFAULT_KEEPALIVE_TIMEOUT_NANOS; import static io.grpc.internal.GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Ticker; import com.google.errorprone.annotations.CanIgnoreReturnValue; import com.google.errorprone.annotations.CheckReturnValue; import com.google.errorprone.annotations.InlineMe; @@ -738,7 +739,7 @@ public final class NettyChannelBuilder extends maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos, keepAliveWithoutCalls, options.getAuthority(), options.getUserAgent(), tooManyPingsRunnable, transportTracerFactory.create(), options.getEagAttributes(), - localSocketPicker, channelLogger, useGetForSafeMethods); + localSocketPicker, channelLogger, useGetForSafeMethods, Ticker.systemTicker()); return transport; } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index 2fe4d65bfa..55337935e3 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; +import com.google.common.base.Ticker; import io.grpc.Attributes; import io.grpc.ChannelLogger; import io.grpc.InternalChannelz; @@ -143,7 +144,8 @@ class NettyClientHandler extends AbstractNettyHandler { TransportTracer transportTracer, Attributes eagAttributes, String authority, - ChannelLogger negotiationLogger) { + ChannelLogger negotiationLogger, + Ticker ticker) { Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive"); Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize); Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder); @@ -169,7 +171,8 @@ class NettyClientHandler extends AbstractNettyHandler { transportTracer, eagAttributes, authority, - negotiationLogger); + negotiationLogger, + ticker); } @VisibleForTesting @@ -187,7 +190,8 @@ class NettyClientHandler extends AbstractNettyHandler { TransportTracer transportTracer, Attributes eagAttributes, String authority, - ChannelLogger negotiationLogger) { + ChannelLogger negotiationLogger, + Ticker ticker) { Preconditions.checkNotNull(connection, "connection"); Preconditions.checkNotNull(frameReader, "frameReader"); Preconditions.checkNotNull(lifecycleManager, "lifecycleManager"); @@ -237,7 +241,8 @@ class NettyClientHandler extends AbstractNettyHandler { eagAttributes, authority, autoFlowControl, - pingCounter); + pingCounter, + ticker); } private NettyClientHandler( @@ -253,9 +258,10 @@ class NettyClientHandler extends AbstractNettyHandler { Attributes eagAttributes, String authority, boolean autoFlowControl, - PingLimiter pingLimiter) { + PingLimiter pingLimiter, + Ticker ticker) { super(/* channelUnused= */ null, decoder, encoder, settings, - negotiationLogger, autoFlowControl, pingLimiter); + negotiationLogger, autoFlowControl, pingLimiter, ticker); this.lifecycleManager = lifecycleManager; this.keepAliveManager = keepAliveManager; this.stopwatchFactory = stopwatchFactory; diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index dbfa8cf7ca..689dd847d5 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -23,6 +23,7 @@ import static io.netty.channel.ChannelOption.SO_KEEPALIVE; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; +import com.google.common.base.Ticker; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import io.grpc.Attributes; @@ -102,6 +103,7 @@ class NettyClientTransport implements ConnectionClientTransport { private final LocalSocketPicker localSocketPicker; private final ChannelLogger channelLogger; private final boolean useGetForSafeMethods; + private final Ticker ticker; NettyClientTransport( SocketAddress address, ChannelFactory channelFactory, @@ -112,7 +114,8 @@ class NettyClientTransport implements ConnectionClientTransport { boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent, Runnable tooManyPingsRunnable, TransportTracer transportTracer, Attributes eagAttributes, LocalSocketPicker localSocketPicker, ChannelLogger channelLogger, - boolean useGetForSafeMethods) { + boolean useGetForSafeMethods, Ticker ticker) { + this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator"); this.negotiationScheme = this.negotiator.scheme(); this.remoteAddress = Preconditions.checkNotNull(address, "address"); @@ -137,6 +140,7 @@ class NettyClientTransport implements ConnectionClientTransport { this.logId = InternalLogId.allocate(getClass(), remoteAddress.toString()); this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger"); this.useGetForSafeMethods = useGetForSafeMethods; + this.ticker = Preconditions.checkNotNull(ticker, "ticker"); } @Override @@ -225,7 +229,8 @@ class NettyClientTransport implements ConnectionClientTransport { transportTracer, eagAttributes, authorityString, - channelLogger); + channelLogger, + ticker); ChannelHandler negotiationHandler = negotiator.newHandler(handler); diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index 62dd50ce65..6382471f46 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -34,6 +34,7 @@ import static io.netty.handler.codec.http2.Http2Headers.PseudoHeaderName.AUTHORI import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.base.Ticker; import io.grpc.Attributes; import io.grpc.ChannelLogger; import io.grpc.ChannelLogger.ChannelLogLevel; @@ -190,7 +191,8 @@ class NettyServerHandler extends AbstractNettyHandler { maxConnectionAgeGraceInNanos, permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, - eagAttributes); + eagAttributes, + Ticker.systemTicker()); } static NettyServerHandler newHandler( @@ -212,7 +214,8 @@ class NettyServerHandler extends AbstractNettyHandler { long maxConnectionAgeGraceInNanos, boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos, - Attributes eagAttributes) { + Attributes eagAttributes, + Ticker ticker) { Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive: %s", maxStreams); Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s", flowControlWindow); @@ -245,6 +248,10 @@ class NettyServerHandler extends AbstractNettyHandler { settings.maxConcurrentStreams(maxStreams); settings.maxHeaderListSize(maxHeaderListSize); + if (ticker == null) { + ticker = Ticker.systemTicker(); + } + return new NettyServerHandler( channelUnused, connection, @@ -258,7 +265,7 @@ class NettyServerHandler extends AbstractNettyHandler { maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos, keepAliveEnforcer, autoFlowControl, - eagAttributes); + eagAttributes, ticker); } private NettyServerHandler( @@ -278,9 +285,10 @@ class NettyServerHandler extends AbstractNettyHandler { long maxConnectionAgeGraceInNanos, final KeepAliveEnforcer keepAliveEnforcer, boolean autoFlowControl, - Attributes eagAttributes) { + Attributes eagAttributes, + Ticker ticker) { super(channelUnused, decoder, encoder, settings, new ServerChannelLogger(), - autoFlowControl, null); + autoFlowControl, null, ticker); final MaxConnectionIdleManager maxConnectionIdleManager; if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) { @@ -325,7 +333,6 @@ class NettyServerHandler extends AbstractNettyHandler { this.transportListener = checkNotNull(transportListener, "transportListener"); this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories"); this.transportTracer = checkNotNull(transportTracer, "transportTracer"); - // Set the frame listener on the decoder. decoder().frameListener(new FrameListener()); } diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index d47942858a..5ec82446cd 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -54,9 +54,11 @@ import com.google.common.io.ByteStreams; import com.google.common.util.concurrent.MoreExecutors; import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.grpc.Attributes; +import io.grpc.CallOptions; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.StatusException; +import io.grpc.internal.AbstractStream; import io.grpc.internal.ClientStreamListener; import io.grpc.internal.ClientStreamListener.RpcProgress; import io.grpc.internal.ClientTransport; @@ -68,6 +70,7 @@ import io.grpc.internal.StatsTraceContext; import io.grpc.internal.StreamListener; import io.grpc.internal.TransportTracer; import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder; +import io.grpc.testing.TestMethodDescriptors; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; @@ -118,7 +121,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase setKeepaliveManagerFor = ImmutableList.of("cancelShouldSucceed", @@ -136,12 +139,31 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase streamListenerMessageQueue = new LinkedList<>(); + private NettyClientStream stream; @Override protected void manualSetUp() throws Exception { setUp(); } + @Override + protected AbstractStream stream() throws Exception { + if (stream == null) { + stream = new NettyClientStream(streamTransportState, + TestMethodDescriptors.voidMethod(), + new Metadata(), + channel(), + AsciiString.of("localhost"), + AsciiString.of("http"), + AsciiString.of("agent"), + StatsTraceContext.NOOP, + transportTracer, + CallOptions.DEFAULT, + false); + } + return stream; + } + /** * Set up for test. */ @@ -201,7 +223,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase captor = ArgumentCaptor.forClass(Metadata.class); verify(streamListener).headersRead(captor.capture()); @@ -323,7 +345,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase captor = ArgumentCaptor.forClass(long.class); verifyWrite().writePing(eq(ctx()), eq(false), captor.capture(), any(ChannelPromise.class)); long payload = captor.getValue(); - channelRead(grpcDataFrame(3, false, contentAsArray())); + channelRead(grpcDataFrame(STREAM_ID, false, contentAsArray())); long pingData = handler().flowControlPing().payload(); channelRead(pingFrame(true, pingData)); @@ -789,18 +811,18 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase { + protected static final int STREAM_ID = 3; private ByteBuf content; private EmbeddedChannel channel; @@ -328,6 +332,8 @@ public abstract class NettyHandlerTestBase { return handler().connection(); } + protected abstract AbstractStream stream() throws Exception; + @CanIgnoreReturnValue protected final ChannelFuture enqueue(WriteQueue.QueuedCommand command) { ChannelFuture future = writeQueue.enqueue(command, true); @@ -415,18 +421,15 @@ public abstract class NettyHandlerTestBase { AbstractNettyHandler handler = (AbstractNettyHandler) handler(); handler.setAutoTuneFlowControl(true); - ByteBuf data = ctx().alloc().buffer(1024); - while (data.isWritable()) { - data.writeLong(1111); - } - int length = data.readableBytes(); - ByteBuf frame = dataFrame(3, false, data.copy()); + byte[] data = initXkbBuffer(1); + int wireSize = data.length + 5; // 5 is the size of the header + ByteBuf frame = grpcDataFrame(3, false, data); channelRead(frame); - int accumulator = length; + int accumulator = wireSize; // 40 is arbitrary, any number large enough to trigger a window update would work for (int i = 0; i < 40; i++) { - channelRead(dataFrame(3, false, data.copy())); - accumulator += length; + channelRead(grpcDataFrame(3, false, data)); + accumulator += wireSize; } long pingData = handler.flowControlPing().payload(); channelRead(pingFrame(true, pingData)); @@ -444,8 +447,10 @@ public abstract class NettyHandlerTestBase { Http2Stream connectionStream = connection().connectionStream(); Http2LocalFlowController localFlowController = connection().local().flowController(); int maxWindow = handler.flowControlPing().maxWindow(); + fakeClock.forwardTime(10, TimeUnit.SECONDS); handler.flowControlPing().setDataSizeAndSincePing(maxWindow); + fakeClock.forwardTime(1, TimeUnit.SECONDS); long payload = handler.flowControlPing().payload(); channelRead(pingFrame(true, payload)); @@ -501,4 +506,124 @@ public abstract class NettyHandlerTestBase { assertEquals(flowControlWindow + 8 * Http2CodecUtil.DEFAULT_WINDOW_SIZE, connection().local().flowController().windowSize(connection().connectionStream())); } + + private AbstractNettyHandler setupPingTest() throws Exception { + this.flowControlWindow = 1024 * 64; + manualSetUp(); + makeStream(); + + AbstractNettyHandler handler = (AbstractNettyHandler) handler(); + handler.setAutoTuneFlowControl(true); + return handler; + } + + @Test + public void bdpPingLimitOutstanding() throws Exception { + AbstractNettyHandler handler = setupPingTest(); + long pingData = handler.flowControlPing().payload(); + + byte[] data1KbBuf = initXkbBuffer(1); + byte[] data40KbBuf = initXkbBuffer(40); + + readXCopies(1, data1KbBuf); // should initiate a ping + + readXCopies(1, data40KbBuf); // no ping, already active + fakeClock().forwardTime(20, TimeUnit.MILLISECONDS); + readPingAck(pingData); + assertEquals(1, handler.flowControlPing().getPingCount()); + assertEquals(1, handler.flowControlPing().getPingReturn()); + + readXCopies(4, data40KbBuf); // initiate ping + assertEquals(2, handler.flowControlPing().getPingCount()); + fakeClock.forwardTime(1, TimeUnit.MILLISECONDS); + readPingAck(pingData); + + readXCopies(1, data1KbBuf); // ping again since had 160K data since last ping started + assertEquals(3, handler.flowControlPing().getPingCount()); + fakeClock.forwardTime(1, TimeUnit.MILLISECONDS); + readPingAck(pingData); + + fakeClock.forwardTime(1, TimeUnit.MILLISECONDS); + readXCopies(1, data1KbBuf); // no ping, too little data + assertEquals(3, handler.flowControlPing().getPingCount()); + } + + @Test + public void testPingBackoff() throws Exception { + AbstractNettyHandler handler = setupPingTest(); + long pingData = handler.flowControlPing().payload(); + byte[] data40KbBuf = initXkbBuffer(40); + + handler.flowControlPing().setDataSizeAndSincePing(200000); + + for (int i = 0; i <= 10; i++) { + int beforeCount = handler.flowControlPing().getPingCount(); + // should resize on 0 + readXCopies(6, data40KbBuf); // initiate ping on i= {0, 1, 3, 6, 10} + int afterCount = handler.flowControlPing().getPingCount(); + fakeClock().forwardNanos(200); + if (afterCount > beforeCount) { + readPingAck(pingData); // should increase backoff multiplier + } + } + assertEquals(6, handler.flowControlPing().getPingCount()); + } + + @Test + public void bdpPingWindowResizing() throws Exception { + this.flowControlWindow = 1024 * 8; + manualSetUp(); + makeStream(); + + AbstractNettyHandler handler = (AbstractNettyHandler) handler(); + handler.setAutoTuneFlowControl(true); + Http2LocalFlowController localFlowController = connection().local().flowController(); + long pingData = handler.flowControlPing().payload(); + int initialWindowSize = localFlowController.initialWindowSize(); + byte[] data1Kb = initXkbBuffer(1); + byte[] data10Kb = initXkbBuffer(10); + + readXCopies(1, data1Kb); // initiate ping + fakeClock().forwardNanos(2); + readPingAck(pingData); // should not resize window because of small target window + assertEquals(initialWindowSize, localFlowController.initialWindowSize()); + + readXCopies(2, data10Kb); // initiate ping on first + fakeClock().forwardNanos(200); + readPingAck(pingData); // should resize window + int windowSizeA = localFlowController.initialWindowSize(); + Assert.assertNotEquals(initialWindowSize, windowSizeA); + + readXCopies(3, data10Kb); // initiate ping w/ first 10K packet + fakeClock().forwardNanos(5000); + readPingAck(pingData); // should not resize window as bandwidth didn't increase + Assert.assertEquals(windowSizeA, localFlowController.initialWindowSize()); + + readXCopies(6, data10Kb); // initiate ping with fist packet + fakeClock().forwardNanos(100); + readPingAck(pingData); // should resize window + int windowSizeB = localFlowController.initialWindowSize(); + Assert.assertNotEquals(windowSizeA, windowSizeB); + } + + private void readPingAck(long pingData) throws Exception { + channelRead(pingFrame(true, pingData)); + } + + private void readXCopies(int copies, byte[] data) throws Exception { + for (int i = 0; i < copies; i++) { + channelRead(grpcDataFrame(STREAM_ID, false, data)); // buffer it + stream().request(1); // consume it + } + } + + private byte[] initXkbBuffer(int multiple) { + ByteBuffer data = ByteBuffer.allocate(1024 * multiple); + + for (int i = 0; i < multiple * 1024 / 4; i++) { + data.putInt(4 * i, 1111); + } + return data.array(); + } + } diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java index 72c267a482..926ce8261a 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java @@ -60,6 +60,7 @@ import io.grpc.ServerStreamTracer; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StreamTracer; +import io.grpc.internal.AbstractStream; import io.grpc.internal.GrpcUtil; import io.grpc.internal.KeepAliveEnforcer; import io.grpc.internal.KeepAliveManager; @@ -112,8 +113,6 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase