diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index d41daecd54..a9e18c992e 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -61,6 +61,7 @@ import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder; import io.netty.handler.codec.http2.DefaultHttp2FrameReader; import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController; +import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController; import io.netty.handler.codec.http2.Http2Connection; import io.netty.handler.codec.http2.Http2ConnectionAdapter; import io.netty.handler.codec.http2.Http2ConnectionDecoder; @@ -78,6 +79,7 @@ import io.netty.handler.codec.http2.Http2Settings; import io.netty.handler.codec.http2.Http2Stream; import io.netty.handler.codec.http2.Http2StreamVisitor; import io.netty.handler.codec.http2.StreamBufferingEncoder; +import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor; import io.netty.handler.logging.LogLevel; import java.nio.channels.ClosedChannelException; import java.util.concurrent.Executor; @@ -121,6 +123,11 @@ class NettyClientHandler extends AbstractNettyHandler { Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder); Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter(); Http2Connection connection = new DefaultHttp2Connection(false); + WeightedFairQueueByteDistributor dist = new WeightedFairQueueByteDistributor(connection); + dist.allocationQuantum(16 * 1024); // Make benchmarks fast again. + DefaultHttp2RemoteFlowController controller = + new DefaultHttp2RemoteFlowController(connection, dist); + connection.remote().flowController(controller); return newHandler(connection, frameReader, frameWriter, lifecycleManager, keepAliveManager, flowControlWindow, maxHeaderListSize, ticker, tooManyPingsRunnable); diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index 4cb787dee5..08a8583339 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -69,6 +69,7 @@ import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder; import io.netty.handler.codec.http2.DefaultHttp2FrameReader; import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController; +import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController; import io.netty.handler.codec.http2.Http2Connection; import io.netty.handler.codec.http2.Http2ConnectionAdapter; import io.netty.handler.codec.http2.Http2ConnectionDecoder; @@ -87,6 +88,7 @@ import io.netty.handler.codec.http2.Http2OutboundFrameLogger; import io.netty.handler.codec.http2.Http2Settings; import io.netty.handler.codec.http2.Http2Stream; import io.netty.handler.codec.http2.Http2StreamVisitor; +import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor; import io.netty.handler.logging.LogLevel; import io.netty.util.AsciiString; import io.netty.util.ReferenceCountUtil; @@ -180,6 +182,11 @@ class NettyServerHandler extends AbstractNettyHandler { Preconditions.checkArgument(maxMessageSize > 0, "maxMessageSize must be positive"); final Http2Connection connection = new DefaultHttp2Connection(true); + WeightedFairQueueByteDistributor dist = new WeightedFairQueueByteDistributor(connection); + dist.allocationQuantum(16 * 1024); // Make benchmarks fast again. + DefaultHttp2RemoteFlowController controller = + new DefaultHttp2RemoteFlowController(connection, dist); + connection.remote().flowController(controller); final KeepAliveEnforcer keepAliveEnforcer = new KeepAliveEnforcer( permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, TimeUnit.NANOSECONDS);