From efbb65522bd12c0fb0f7a605de52bbd7ba83f683 Mon Sep 17 00:00:00 2001 From: nmittler Date: Tue, 30 Jun 2015 12:10:47 -0700 Subject: [PATCH] Simplifying flow control window config for Netty. Fixes #494 --- .../benchmarks/netty/AbstractBenchmark.java | 6 ++-- .../io/grpc/benchmarks/qps/AsyncClient.java | 5 ++- .../io/grpc/benchmarks/qps/AsyncServer.java | 3 +- .../benchmarks/qps/ClientConfiguration.java | 16 +++------ .../grpc/benchmarks/qps/OpenLoopClient.java | 5 ++- .../benchmarks/qps/ServerConfiguration.java | 16 +++------ .../java/io/grpc/benchmarks/qps/Utils.java | 3 +- .../transport/netty/NettyChannelBuilder.java | 33 ++++++------------- .../transport/netty/NettyClientHandler.java | 22 ++++++------- .../transport/netty/NettyClientTransport.java | 11 +++---- .../io/grpc/transport/netty/NettyServer.java | 15 ++++----- .../transport/netty/NettyServerBuilder.java | 31 +++++------------ .../transport/netty/NettyServerHandler.java | 17 +++++----- .../transport/netty/NettyServerTransport.java | 10 +++--- .../netty/NettyClientHandlerTest.java | 22 ++++++------- .../netty/NettyClientTransportTest.java | 4 +-- .../netty/NettyServerHandlerTest.java | 17 +++++----- .../okhttp/OkHttpChannelBuilder.java | 2 +- 18 files changed, 90 insertions(+), 148 deletions(-) diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/AbstractBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/AbstractBenchmark.java index 5a12a51e14..b3f6f3dead 100644 --- a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/AbstractBenchmark.java +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/AbstractBenchmark.java @@ -207,10 +207,8 @@ public abstract class AbstractBenchmark { serverBuilder.workerEventLoopGroup(new NioEventLoopGroup()); // Always set connection and stream window size to same value - serverBuilder.connectionWindowSize(windowSize.bytes()); - serverBuilder.streamWindowSize(windowSize.bytes()); - channelBuilder.connectionWindowSize(windowSize.bytes()); - channelBuilder.streamWindowSize(windowSize.bytes()); + serverBuilder.flowControlWindow(windowSize.bytes()); + channelBuilder.flowControlWindow(windowSize.bytes()); channelBuilder.negotiationType(NegotiationType.PLAINTEXT); serverBuilder.maxConcurrentCallsPerConnection(maxConcurrentStreams); diff --git a/benchmarks/src/main/java/io/grpc/benchmarks/qps/AsyncClient.java b/benchmarks/src/main/java/io/grpc/benchmarks/qps/AsyncClient.java index ffc320c215..ad1a78949c 100644 --- a/benchmarks/src/main/java/io/grpc/benchmarks/qps/AsyncClient.java +++ b/benchmarks/src/main/java/io/grpc/benchmarks/qps/AsyncClient.java @@ -34,14 +34,13 @@ package io.grpc.benchmarks.qps; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.ADDRESS; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.CHANNELS; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.CLIENT_PAYLOAD; -import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.CONNECTION_WINDOW; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.DIRECTEXECUTOR; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.DURATION; +import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.FLOW_CONTROL_WINDOW; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.OUTSTANDING_RPCS; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.SAVE_HISTOGRAM; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.SERVER_PAYLOAD; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.STREAMING_RPCS; -import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.STREAM_WINDOW; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TESTCA; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TLS; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TRANSPORT; @@ -327,7 +326,7 @@ public class AsyncClient { ClientConfiguration.Builder configBuilder = ClientConfiguration.newBuilder( ADDRESS, CHANNELS, OUTSTANDING_RPCS, CLIENT_PAYLOAD, SERVER_PAYLOAD, TLS, TESTCA, TRANSPORT, DURATION, WARMUP_DURATION, DIRECTEXECUTOR, - SAVE_HISTOGRAM, STREAMING_RPCS, CONNECTION_WINDOW, STREAM_WINDOW); + SAVE_HISTOGRAM, STREAMING_RPCS, FLOW_CONTROL_WINDOW); ClientConfiguration config; try { config = configBuilder.build(args); diff --git a/benchmarks/src/main/java/io/grpc/benchmarks/qps/AsyncServer.java b/benchmarks/src/main/java/io/grpc/benchmarks/qps/AsyncServer.java index abb8e46c0a..3c4b9cd5ba 100644 --- a/benchmarks/src/main/java/io/grpc/benchmarks/qps/AsyncServer.java +++ b/benchmarks/src/main/java/io/grpc/benchmarks/qps/AsyncServer.java @@ -168,8 +168,7 @@ public class AsyncServer { .addService(TestServiceGrpc.bindService(new TestServiceImpl())) .sslContext(sslContext) .executor(config.directExecutor ? MoreExecutors.newDirectExecutorService() : null) - .connectionWindowSize(config.connectionWindow) - .streamWindowSize(config.streamWindow) + .flowControlWindow(config.flowControlWindow) .build(); } diff --git a/benchmarks/src/main/java/io/grpc/benchmarks/qps/ClientConfiguration.java b/benchmarks/src/main/java/io/grpc/benchmarks/qps/ClientConfiguration.java index b8b709707d..0f4d0046a9 100644 --- a/benchmarks/src/main/java/io/grpc/benchmarks/qps/ClientConfiguration.java +++ b/benchmarks/src/main/java/io/grpc/benchmarks/qps/ClientConfiguration.java @@ -66,8 +66,7 @@ class ClientConfiguration implements Configuration { int outstandingRpcsPerChannel = 10; int serverPayload; int clientPayload; - int connectionWindow = NettyChannelBuilder.DEFAULT_CONNECTION_WINDOW_SIZE; - int streamWindow = NettyChannelBuilder.DEFAULT_STREAM_WINDOW_SIZE; + int flowControlWindow = NettyChannelBuilder.DEFAULT_FLOW_CONTROL_WINDOW; // seconds int duration = 60; // seconds @@ -271,18 +270,11 @@ class ClientConfiguration implements Configuration { config.rpcType = STREAMING; } }, - CONNECTION_WINDOW("BYTES", "The HTTP/2 connection flow control window.", - "" + DEFAULT.connectionWindow) { + FLOW_CONTROL_WINDOW("BYTES", "The HTTP/2 flow control window.", + "" + DEFAULT.flowControlWindow) { @Override protected void setClientValue(ClientConfiguration config, String value) { - config.connectionWindow = parseInt(value); - } - }, - STREAM_WINDOW("BYTES", "The HTTP/2 per-stream flow control window.", - "" + DEFAULT.streamWindow) { - @Override - protected void setClientValue(ClientConfiguration config, String value) { - config.streamWindow = parseInt(value); + config.flowControlWindow = parseInt(value); } }, TARGET_QPS("INT", "Average number of QPS to shoot for.", "" + DEFAULT.targetQps, true) { diff --git a/benchmarks/src/main/java/io/grpc/benchmarks/qps/OpenLoopClient.java b/benchmarks/src/main/java/io/grpc/benchmarks/qps/OpenLoopClient.java index 327089a9cc..d444c1c682 100644 --- a/benchmarks/src/main/java/io/grpc/benchmarks/qps/OpenLoopClient.java +++ b/benchmarks/src/main/java/io/grpc/benchmarks/qps/OpenLoopClient.java @@ -34,11 +34,10 @@ package io.grpc.benchmarks.qps; import static com.google.common.base.Preconditions.checkNotNull; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.ADDRESS; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.CLIENT_PAYLOAD; -import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.CONNECTION_WINDOW; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.DURATION; +import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.FLOW_CONTROL_WINDOW; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.SAVE_HISTOGRAM; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.SERVER_PAYLOAD; -import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.STREAM_WINDOW; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TARGET_QPS; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TESTCA; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TLS; @@ -86,7 +85,7 @@ public class OpenLoopClient { public static void main(String... args) throws Exception { ClientConfiguration.Builder configBuilder = ClientConfiguration.newBuilder( ADDRESS, TARGET_QPS, CLIENT_PAYLOAD, SERVER_PAYLOAD, TLS, - TESTCA, TRANSPORT, DURATION, SAVE_HISTOGRAM, CONNECTION_WINDOW, STREAM_WINDOW); + TESTCA, TRANSPORT, DURATION, SAVE_HISTOGRAM, FLOW_CONTROL_WINDOW); ClientConfiguration config; try { config = configBuilder.build(args); diff --git a/benchmarks/src/main/java/io/grpc/benchmarks/qps/ServerConfiguration.java b/benchmarks/src/main/java/io/grpc/benchmarks/qps/ServerConfiguration.java index a153609e57..c9fee0272b 100644 --- a/benchmarks/src/main/java/io/grpc/benchmarks/qps/ServerConfiguration.java +++ b/benchmarks/src/main/java/io/grpc/benchmarks/qps/ServerConfiguration.java @@ -57,8 +57,7 @@ class ServerConfiguration implements Configuration { boolean tls; boolean directExecutor; SocketAddress address; - int connectionWindow = NettyChannelBuilder.DEFAULT_CONNECTION_WINDOW_SIZE; - int streamWindow = NettyChannelBuilder.DEFAULT_STREAM_WINDOW_SIZE; + int flowControlWindow = NettyChannelBuilder.DEFAULT_FLOW_CONTROL_WINDOW; private ServerConfiguration() { } @@ -186,18 +185,11 @@ class ServerConfiguration implements Configuration { config.directExecutor = parseBoolean(value); } }, - CONNECTION_WINDOW("BYTES", "The HTTP/2 connection flow control window.", - "" + DEFAULT.connectionWindow) { + FLOW_CONTROL_WINDOW("BYTES", "The HTTP/2 flow control window.", + "" + DEFAULT.flowControlWindow) { @Override protected void setServerValue(ServerConfiguration config, String value) { - config.connectionWindow = parseInt(value); - } - }, - STREAM_WINDOW("BYTES", "The HTTP/2 per-stream flow control window.", - "" + DEFAULT.streamWindow) { - @Override - protected void setServerValue(ServerConfiguration config, String value) { - config.streamWindow = parseInt(value); + config.flowControlWindow = parseInt(value); } }; diff --git a/benchmarks/src/main/java/io/grpc/benchmarks/qps/Utils.java b/benchmarks/src/main/java/io/grpc/benchmarks/qps/Utils.java index 2880b4c18d..2a5a5e517a 100644 --- a/benchmarks/src/main/java/io/grpc/benchmarks/qps/Utils.java +++ b/benchmarks/src/main/java/io/grpc/benchmarks/qps/Utils.java @@ -176,8 +176,7 @@ final class Utils { .negotiationType(negotiationType) .executor(config.directExecutor ? MoreExecutors.newDirectExecutorService() : null) .sslContext(context) - .connectionWindowSize(config.connectionWindow) - .streamWindowSize(config.streamWindow) + .flowControlWindow(config.flowControlWindow) .build(); } diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/transport/netty/NettyChannelBuilder.java index f357345ef1..56c3f975af 100644 --- a/netty/src/main/java/io/grpc/transport/netty/NettyChannelBuilder.java +++ b/netty/src/main/java/io/grpc/transport/netty/NettyChannelBuilder.java @@ -37,10 +37,10 @@ import io.grpc.AbstractChannelBuilder; import io.grpc.SharedResourceHolder; import io.grpc.transport.ClientTransport; import io.grpc.transport.ClientTransportFactory; + import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.codec.http2.Http2CodecUtil; import io.netty.handler.ssl.SslContext; import java.net.InetSocketAddress; @@ -52,16 +52,14 @@ import javax.net.ssl.SSLException; * A builder to help simplify construction of channels using the Netty transport. */ public final class NettyChannelBuilder extends AbstractChannelBuilder { - public static final int DEFAULT_CONNECTION_WINDOW_SIZE = 1048576; // 1MiB - public static final int DEFAULT_STREAM_WINDOW_SIZE = Http2CodecUtil.DEFAULT_WINDOW_SIZE; + public static final int DEFAULT_FLOW_CONTROL_WINDOW = 1048576; // 1MiB private final SocketAddress serverAddress; private NegotiationType negotiationType = NegotiationType.TLS; private Class channelType = NioSocketChannel.class; private EventLoopGroup userEventLoopGroup; private SslContext sslContext; - private int connectionWindowSize = DEFAULT_CONNECTION_WINDOW_SIZE; - private int streamWindowSize = DEFAULT_STREAM_WINDOW_SIZE; + private int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW; /** * Creates a new builder with the given server address. @@ -123,22 +121,12 @@ public final class NettyChannelBuilder extends AbstractChannelBuilder 0, "connectionWindowSize must be positive"); - this.connectionWindowSize = connectionWindowSize; - return this; - } - - /** - * Sets the HTTP/2 per-stream flow control window. If not called, the default value - * is {@link #DEFAULT_STREAM_WINDOW_SIZE}). - */ - public NettyChannelBuilder streamWindowSize(int streamWindowSize) { - Preconditions.checkArgument(streamWindowSize > 0, "streamWindowSize must be positive"); - this.streamWindowSize = streamWindowSize; + public NettyChannelBuilder flowControlWindow(int flowControlWindow) { + Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive"); + this.flowControlWindow = flowControlWindow; return this; } @@ -148,8 +136,7 @@ public final class NettyChannelBuilder extends AbstractChannelBuilder channelType = this.channelType; - final int connectionWindowSize = this.connectionWindowSize; - final int streamWindowSize = this.streamWindowSize; + final int flowControlWindow = this.flowControlWindow; final ProtocolNegotiator negotiator; switch (negotiationType) { case PLAINTEXT: @@ -179,7 +166,7 @@ public final class NettyChannelBuilder extends AbstractChannelBuilder 0, "connectionWindowSize must be positive"); - this.connectionWindowSize = connectionWindowSize; + Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive"); + this.flowControlWindow = flowControlWindow; initListener(); @@ -121,7 +121,7 @@ class NettyClientHandler extends Http2ConnectionHandler { // frame. Once we upgrade to Netty 4.1.Beta6 we'll be able to pass in the initial SETTINGS // to the super class constructor. initialSettings.pushEnabled(false); - initialSettings.initialWindowSize(streamWindowSize); + initialSettings.initialWindowSize(flowControlWindow); initialSettings.maxConcurrentStreams(0); } @@ -261,7 +261,7 @@ class NettyClientHandler extends Http2ConnectionHandler { Http2Stream stream = connection().stream(http2Ex.streamId()); if (stream != null) { clientStream(stream).transportReportStatus(Status.fromThrowable(cause), false, - new Metadata.Trailers()); + new Metadata.Trailers()); } // Delegate to the base class to send a RST_STREAM. @@ -480,13 +480,13 @@ class NettyClientHandler extends Http2ConnectionHandler { } // Send the initial connection window if different than the default. - if (connectionWindowSize > 0) { + if (flowControlWindow > 0) { needToFlush = true; Http2Stream connectionStream = connection().connectionStream(); int currentSize = connection().local().flowController().windowSize(connectionStream); - int delta = connectionWindowSize - currentSize; + int delta = flowControlWindow - currentSize; decoder().flowController().incrementWindowSize(ctx, connectionStream, delta); - connectionWindowSize = -1; + flowControlWindow = -1; } if (needToFlush) { diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java index c11bd0961f..e7a8ad600f 100644 --- a/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java @@ -82,8 +82,7 @@ class NettyClientTransport implements ClientTransport { private final ProtocolNegotiator.Handler negotiationHandler; private final NettyClientHandler handler; private final AsciiString authority; - private final int connectionWindowSize; - private final int streamWindowSize; + private final int flowControlWindow; // We should not send on the channel until negotiation completes. This is a hard requirement // by SslHandler but is appropriate for HTTP/1.1 Upgrade as well. private Channel channel; @@ -97,13 +96,12 @@ class NettyClientTransport implements ClientTransport { NettyClientTransport(SocketAddress address, Class channelType, EventLoopGroup group, ProtocolNegotiator negotiator, - int connectionWindowSize, int streamWindowSize) { + int flowControlWindow) { Preconditions.checkNotNull(negotiator, "negotiator"); this.address = Preconditions.checkNotNull(address, "address"); this.group = Preconditions.checkNotNull(group, "group"); this.channelType = Preconditions.checkNotNull(channelType, "channelType"); - this.connectionWindowSize = connectionWindowSize; - this.streamWindowSize = streamWindowSize; + this.flowControlWindow = flowControlWindow; if (address instanceof InetSocketAddress) { InetSocketAddress inetAddress = (InetSocketAddress) address; @@ -232,7 +230,6 @@ class NettyClientTransport implements ClientTransport { BufferingHttp2ConnectionEncoder encoder = new BufferingHttp2ConnectionEncoder( new DefaultHttp2ConnectionEncoder(connection, frameWriter)); - return new NettyClientHandler(encoder, connection, frameReader, connectionWindowSize, - streamWindowSize); + return new NettyClientHandler(encoder, connection, frameReader, flowControlWindow); } } diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyServer.java b/netty/src/main/java/io/grpc/transport/netty/NettyServer.java index 5279f37f04..1b0cdf72f4 100644 --- a/netty/src/main/java/io/grpc/transport/netty/NettyServer.java +++ b/netty/src/main/java/io/grpc/transport/netty/NettyServer.java @@ -69,27 +69,25 @@ public class NettyServer implements Server { private final int maxStreamsPerConnection; private ServerListener listener; private Channel channel; - private int connectionWindowSize; - private int streamWindowSize; + private int flowControlWindow; NettyServer(SocketAddress address, Class channelType, EventLoopGroup bossGroup, EventLoopGroup workerGroup, int maxStreamsPerConnection, - int connectionWindowSize, int streamWindowSize) { + int flowControlWindow) { this(address, channelType, bossGroup, workerGroup, null, maxStreamsPerConnection, - connectionWindowSize, streamWindowSize); + flowControlWindow); } NettyServer(SocketAddress address, Class channelType, EventLoopGroup bossGroup, EventLoopGroup workerGroup, @Nullable SslContext sslContext, - int maxStreamsPerConnection, int connectionWindowSize, int streamWindowSize) { + int maxStreamsPerConnection, int flowControlWindow) { this.address = address; this.channelType = Preconditions.checkNotNull(channelType, "channelType"); this.bossGroup = Preconditions.checkNotNull(bossGroup, "bossGroup"); this.workerGroup = Preconditions.checkNotNull(workerGroup, "workerGroup"); this.sslContext = sslContext; this.maxStreamsPerConnection = maxStreamsPerConnection; - this.connectionWindowSize = connectionWindowSize; - this.streamWindowSize = streamWindowSize; + this.flowControlWindow = flowControlWindow; } @Override @@ -106,8 +104,7 @@ public class NettyServer implements Server { @Override public void initChannel(Channel ch) throws Exception { NettyServerTransport transport - = new NettyServerTransport(ch, sslContext, maxStreamsPerConnection, - connectionWindowSize, streamWindowSize); + = new NettyServerTransport(ch, sslContext, maxStreamsPerConnection, flowControlWindow); transport.start(listener.transportCreated(transport)); } }); diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/transport/netty/NettyServerBuilder.java index 435ad817da..209ac5444e 100644 --- a/netty/src/main/java/io/grpc/transport/netty/NettyServerBuilder.java +++ b/netty/src/main/java/io/grpc/transport/netty/NettyServerBuilder.java @@ -36,10 +36,10 @@ import com.google.common.base.Preconditions; import io.grpc.AbstractServerBuilder; import io.grpc.HandlerRegistry; import io.grpc.SharedResourceHolder; + import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.codec.http2.Http2CodecUtil; import io.netty.handler.ssl.SslContext; import java.net.InetSocketAddress; @@ -49,8 +49,7 @@ import java.net.SocketAddress; * A builder to help simplify the construction of a Netty-based GRPC server. */ public final class NettyServerBuilder extends AbstractServerBuilder { - public static final int DEFAULT_CONNECTION_WINDOW_SIZE = 1048576; // 1MiB - public static final int DEFAULT_STREAM_WINDOW_SIZE = Http2CodecUtil.DEFAULT_WINDOW_SIZE; + public static final int DEFAULT_FLOW_CONTROL_WINDOW = 1048576; // 1MiB private final SocketAddress address; private Class channelType = NioServerSocketChannel.class; @@ -58,8 +57,7 @@ public final class NettyServerBuilder extends AbstractServerBuilder 0, "connectionWindowSize must be positive"); - this.connectionWindowSize = connectionWindowSize; - return this; - } - - /** - * Sets the HTTP/2 per-stream flow control window. If not called, the default value - * is {@link #DEFAULT_STREAM_WINDOW_SIZE}). - */ - public NettyServerBuilder streamWindowSize(int streamWindowSize) { - Preconditions.checkArgument(streamWindowSize > 0, "streamWindowSize must be positive"); - this.streamWindowSize = streamWindowSize; + public NettyServerBuilder flowControlWindow(int flowControlWindow) { + Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive"); + this.flowControlWindow = flowControlWindow; return this; } @@ -207,8 +195,7 @@ public final class NettyServerBuilder extends AbstractServerBuilder 0, "connectionWindowSize must be positive"); - this.connectionWindowSize = connectionWindowSize; + Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive"); + this.flowControlWindow = flowControlWindow; streamKey = connection.newKey(); this.transportListener = Preconditions.checkNotNull(transportListener, "transportListener"); @@ -105,7 +104,7 @@ class NettyServerHandler extends Http2ConnectionHandler { // TODO(nmittler): this is a temporary hack as we currently have to send a 2nd SETTINGS // frame. Once we upgrade to Netty 4.1.Beta6 we'll be able to pass in the initial SETTINGS // to the super class constructor. - initialSettings.initialWindowSize(streamWindowSize); + initialSettings.initialWindowSize(flowControlWindow); initialSettings.maxConcurrentStreams(maxStreams); } @@ -351,13 +350,13 @@ class NettyServerHandler extends Http2ConnectionHandler { } // Send the initial connection window if different than the default. - if (connectionWindowSize > 0) { + if (flowControlWindow > 0) { needToFlush = true; Http2Stream connectionStream = connection().connectionStream(); int currentSize = connection().local().flowController().windowSize(connectionStream); - int delta = connectionWindowSize - currentSize; + int delta = flowControlWindow - currentSize; decoder().flowController().incrementWindowSize(ctx, connectionStream, delta); - connectionWindowSize = -1; + flowControlWindow = -1; } if (needToFlush) { diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/transport/netty/NettyServerTransport.java index 83dc0fbd3d..f3b31f5c14 100644 --- a/netty/src/main/java/io/grpc/transport/netty/NettyServerTransport.java +++ b/netty/src/main/java/io/grpc/transport/netty/NettyServerTransport.java @@ -67,16 +67,14 @@ class NettyServerTransport implements ServerTransport { private final int maxStreams; private ServerTransportListener listener; private boolean terminated; - private int connectionWindowSize; - private int streamWindowSize; + private int flowControlWindow; NettyServerTransport(Channel channel, @Nullable SslContext sslContext, int maxStreams, - int connectionWindowSize, int streamWindowSize) { + int flowControlWindow) { this.channel = Preconditions.checkNotNull(channel, "channel"); this.sslContext = sslContext; this.maxStreams = maxStreams; - this.connectionWindowSize = connectionWindowSize; - this.streamWindowSize = streamWindowSize; + this.flowControlWindow = flowControlWindow; } public void start(ServerTransportListener listener) { @@ -137,6 +135,6 @@ class NettyServerTransport implements ServerTransport { new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger); return new NettyServerHandler(transportListener, connection, frameReader, frameWriter, - maxStreams, connectionWindowSize, streamWindowSize); + maxStreams, flowControlWindow); } } diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/transport/netty/NettyClientHandlerTest.java index 1d3add0b08..4cf286e358 100644 --- a/netty/src/test/java/io/grpc/transport/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/transport/netty/NettyClientHandlerTest.java @@ -67,6 +67,7 @@ import io.grpc.Status.Code; import io.grpc.StatusException; import io.grpc.transport.ClientTransport; import io.grpc.transport.ClientTransport.PingCallback; + import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; @@ -129,7 +130,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase { frameWriter = new DefaultHttp2FrameWriter(); frameReader = new DefaultHttp2FrameReader(); - handler = newHandler(DEFAULT_WINDOW_SIZE, DEFAULT_WINDOW_SIZE); + handler = newHandler(DEFAULT_WINDOW_SIZE); content = Unpooled.copiedBuffer("hello world", UTF_8); when(channel.isActive()).thenReturn(true); @@ -314,7 +315,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase { assertTrue(promise.isDone()); assertFalse(promise.isSuccess()); verify(stream).transportReportStatus(any(Status.class), eq(false), - notNull(Metadata.Trailers.class)); + notNull(Metadata.Trailers.class)); } @Test @@ -323,13 +324,13 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase { // Read a GOAWAY that indicates our stream was never processed by the server. handler.channelRead(ctx, - goAwayFrame(0, 8 /* Cancel */, Unpooled.copiedBuffer("this is a test", UTF_8))); + goAwayFrame(0, 8 /* Cancel */, Unpooled.copiedBuffer("this is a test", UTF_8))); ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); verify(stream).transportReportStatus(captor.capture(), eq(false), - notNull(Metadata.Trailers.class)); + notNull(Metadata.Trailers.class)); assertEquals(Status.CANCELLED.getCode(), captor.getValue().getCode()); assertEquals("HTTP/2 error code: CANCEL\nthis is a test", - captor.getValue().getDescription()); + captor.getValue().getDescription()); } @Test @@ -358,7 +359,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase { when(stream.id()).thenReturn(3); writeQueue.enqueue(new CancelStreamCommand(stream), true); verify(stream).transportReportStatus(eq(Status.CANCELLED), eq(true), - any(Metadata.Trailers.class)); + any(Metadata.Trailers.class)); } @Test @@ -379,14 +380,14 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase { ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); InOrder inOrder = inOrder(stream); inOrder.verify(stream, calls(1)).transportReportStatus(captor.capture(), eq(false), - notNull(Metadata.Trailers.class)); + notNull(Metadata.Trailers.class)); assertEquals(Status.UNAVAILABLE.getCode(), captor.getValue().getCode()); } @Test public void connectionWindowShouldBeOverridden() throws Exception { int connectionWindow = 1048576; // 1MiB - handler = newHandler(connectionWindow, DEFAULT_WINDOW_SIZE); + handler = newHandler(connectionWindow); handler.handlerAdded(ctx); Http2Stream connectionStream = handler.connection().connectionStream(); Http2FlowController localFlowController = handler.connection().local().flowController(); @@ -534,7 +535,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase { mockContext(); } - private NettyClientHandler newHandler(int connectionWindowSize, int streamWindowSize) { + private NettyClientHandler newHandler(int connectionWindowSize) { Http2Connection connection = new DefaultHttp2Connection(false); Http2FrameReader frameReader = new DefaultHttp2FrameReader(); Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter(); @@ -546,8 +547,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase { return nanoTime; } }; - return new NettyClientHandler(encoder, connection, frameReader, connectionWindowSize, - streamWindowSize, ticker); + return new NettyClientHandler(encoder, connection, frameReader, connectionWindowSize, ticker); } private AsciiString as(String string) { diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/transport/netty/NettyClientTransportTest.java index 3ced8697c3..8c62b39d99 100644 --- a/netty/src/test/java/io/grpc/transport/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/transport/netty/NettyClientTransportTest.java @@ -182,7 +182,7 @@ public class NettyClientTransportTest { private NettyClientTransport newTransport(ProtocolNegotiator negotiator) { NettyClientTransport transport = new NettyClientTransport(address, NioSocketChannel.class, - group, negotiator, DEFAULT_WINDOW_SIZE, DEFAULT_WINDOW_SIZE); + group, negotiator, DEFAULT_WINDOW_SIZE); transports.add(transport); return transport; } @@ -198,7 +198,7 @@ public class NettyClientTransportTest { .ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE).build(); server = new NettyServer(address, NioServerSocketChannel.class, group, group, serverContext, maxStreamsPerConnection, - DEFAULT_WINDOW_SIZE, DEFAULT_WINDOW_SIZE); + DEFAULT_WINDOW_SIZE); server.start(serverListener); } diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/transport/netty/NettyServerHandlerTest.java index 205066734f..d37607b0e6 100644 --- a/netty/src/test/java/io/grpc/transport/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/transport/netty/NettyServerHandlerTest.java @@ -301,7 +301,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase { Http2Connection connection = new DefaultHttp2Connection(true); handler = new NettyServerHandler(transportListener, connection, new DefaultHttp2FrameReader(), - frameWriter, maxConcurrentStreams, DEFAULT_WINDOW_SIZE, DEFAULT_WINDOW_SIZE); + frameWriter, maxConcurrentStreams, DEFAULT_WINDOW_SIZE); when(channel.isActive()).thenReturn(true); mockContext(); @@ -324,15 +324,15 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase { @Test public void connectionWindowShouldBeOverridden() throws Exception { - int connectionWindow = 1048576; // 1MiB - handler = newHandler(transportListener, connectionWindow, DEFAULT_WINDOW_SIZE); + int flowControlWindow = 1048576; // 1MiB + handler = newHandler(transportListener, flowControlWindow); handler.handlerAdded(ctx); Http2Stream connectionStream = handler.connection().connectionStream(); Http2FlowController localFlowController = handler.connection().local().flowController(); int actualInitialWindowSize = localFlowController.initialWindowSize(connectionStream); int actualWindowSize = localFlowController.windowSize(connectionStream); - assertEquals(connectionWindow, actualWindowSize); - assertEquals(connectionWindow, actualInitialWindowSize); + assertEquals(flowControlWindow, actualWindowSize); + assertEquals(flowControlWindow, actualInitialWindowSize); } private void createStream() throws Exception { @@ -388,16 +388,15 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase { } private static NettyServerHandler newHandler(ServerTransportListener transportListener, - int connectionWindowSize, - int streamWindowSize) { + int flowControlWindow) { Http2Connection connection = new DefaultHttp2Connection(true); Http2FrameReader frameReader = new DefaultHttp2FrameReader(); Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter(); return new NettyServerHandler(transportListener, connection, frameReader, frameWriter, - Integer.MAX_VALUE, connectionWindowSize, streamWindowSize); + Integer.MAX_VALUE, flowControlWindow); } private static NettyServerHandler newHandler(ServerTransportListener transportListener) { - return newHandler(transportListener, DEFAULT_WINDOW_SIZE, DEFAULT_WINDOW_SIZE); + return newHandler(transportListener, DEFAULT_WINDOW_SIZE); } } diff --git a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpChannelBuilder.java b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpChannelBuilder.java index 234f5cbd9e..53f415534e 100644 --- a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpChannelBuilder.java +++ b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpChannelBuilder.java @@ -36,8 +36,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.squareup.okhttp.CipherSuite; import com.squareup.okhttp.ConnectionSpec; - import com.squareup.okhttp.TlsVersion; + import io.grpc.AbstractChannelBuilder; import io.grpc.SharedResourceHolder; import io.grpc.SharedResourceHolder.Resource;