Simplifying flow control window config for Netty.

Fixes #494
This commit is contained in:
nmittler 2015-06-30 12:10:47 -07:00
parent 35ff624eb2
commit efbb65522b
18 changed files with 90 additions and 148 deletions

View File

@ -207,10 +207,8 @@ public abstract class AbstractBenchmark {
serverBuilder.workerEventLoopGroup(new NioEventLoopGroup()); serverBuilder.workerEventLoopGroup(new NioEventLoopGroup());
// Always set connection and stream window size to same value // Always set connection and stream window size to same value
serverBuilder.connectionWindowSize(windowSize.bytes()); serverBuilder.flowControlWindow(windowSize.bytes());
serverBuilder.streamWindowSize(windowSize.bytes()); channelBuilder.flowControlWindow(windowSize.bytes());
channelBuilder.connectionWindowSize(windowSize.bytes());
channelBuilder.streamWindowSize(windowSize.bytes());
channelBuilder.negotiationType(NegotiationType.PLAINTEXT); channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
serverBuilder.maxConcurrentCallsPerConnection(maxConcurrentStreams); serverBuilder.maxConcurrentCallsPerConnection(maxConcurrentStreams);

View File

@ -34,14 +34,13 @@ package io.grpc.benchmarks.qps;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.ADDRESS; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.ADDRESS;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.CHANNELS; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.CHANNELS;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.CLIENT_PAYLOAD; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.CLIENT_PAYLOAD;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.CONNECTION_WINDOW;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.DIRECTEXECUTOR; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.DIRECTEXECUTOR;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.DURATION; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.DURATION;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.FLOW_CONTROL_WINDOW;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.OUTSTANDING_RPCS; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.OUTSTANDING_RPCS;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.SAVE_HISTOGRAM; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.SAVE_HISTOGRAM;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.SERVER_PAYLOAD; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.SERVER_PAYLOAD;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.STREAMING_RPCS; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.STREAMING_RPCS;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.STREAM_WINDOW;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TESTCA; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TESTCA;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TLS; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TLS;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TRANSPORT; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TRANSPORT;
@ -327,7 +326,7 @@ public class AsyncClient {
ClientConfiguration.Builder configBuilder = ClientConfiguration.newBuilder( ClientConfiguration.Builder configBuilder = ClientConfiguration.newBuilder(
ADDRESS, CHANNELS, OUTSTANDING_RPCS, CLIENT_PAYLOAD, SERVER_PAYLOAD, ADDRESS, CHANNELS, OUTSTANDING_RPCS, CLIENT_PAYLOAD, SERVER_PAYLOAD,
TLS, TESTCA, TRANSPORT, DURATION, WARMUP_DURATION, DIRECTEXECUTOR, TLS, TESTCA, TRANSPORT, DURATION, WARMUP_DURATION, DIRECTEXECUTOR,
SAVE_HISTOGRAM, STREAMING_RPCS, CONNECTION_WINDOW, STREAM_WINDOW); SAVE_HISTOGRAM, STREAMING_RPCS, FLOW_CONTROL_WINDOW);
ClientConfiguration config; ClientConfiguration config;
try { try {
config = configBuilder.build(args); config = configBuilder.build(args);

View File

@ -168,8 +168,7 @@ public class AsyncServer {
.addService(TestServiceGrpc.bindService(new TestServiceImpl())) .addService(TestServiceGrpc.bindService(new TestServiceImpl()))
.sslContext(sslContext) .sslContext(sslContext)
.executor(config.directExecutor ? MoreExecutors.newDirectExecutorService() : null) .executor(config.directExecutor ? MoreExecutors.newDirectExecutorService() : null)
.connectionWindowSize(config.connectionWindow) .flowControlWindow(config.flowControlWindow)
.streamWindowSize(config.streamWindow)
.build(); .build();
} }

View File

@ -66,8 +66,7 @@ class ClientConfiguration implements Configuration {
int outstandingRpcsPerChannel = 10; int outstandingRpcsPerChannel = 10;
int serverPayload; int serverPayload;
int clientPayload; int clientPayload;
int connectionWindow = NettyChannelBuilder.DEFAULT_CONNECTION_WINDOW_SIZE; int flowControlWindow = NettyChannelBuilder.DEFAULT_FLOW_CONTROL_WINDOW;
int streamWindow = NettyChannelBuilder.DEFAULT_STREAM_WINDOW_SIZE;
// seconds // seconds
int duration = 60; int duration = 60;
// seconds // seconds
@ -271,18 +270,11 @@ class ClientConfiguration implements Configuration {
config.rpcType = STREAMING; config.rpcType = STREAMING;
} }
}, },
CONNECTION_WINDOW("BYTES", "The HTTP/2 connection flow control window.", FLOW_CONTROL_WINDOW("BYTES", "The HTTP/2 flow control window.",
"" + DEFAULT.connectionWindow) { "" + DEFAULT.flowControlWindow) {
@Override @Override
protected void setClientValue(ClientConfiguration config, String value) { protected void setClientValue(ClientConfiguration config, String value) {
config.connectionWindow = parseInt(value); config.flowControlWindow = parseInt(value);
}
},
STREAM_WINDOW("BYTES", "The HTTP/2 per-stream flow control window.",
"" + DEFAULT.streamWindow) {
@Override
protected void setClientValue(ClientConfiguration config, String value) {
config.streamWindow = parseInt(value);
} }
}, },
TARGET_QPS("INT", "Average number of QPS to shoot for.", "" + DEFAULT.targetQps, true) { TARGET_QPS("INT", "Average number of QPS to shoot for.", "" + DEFAULT.targetQps, true) {

View File

@ -34,11 +34,10 @@ package io.grpc.benchmarks.qps;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.ADDRESS; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.ADDRESS;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.CLIENT_PAYLOAD; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.CLIENT_PAYLOAD;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.CONNECTION_WINDOW;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.DURATION; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.DURATION;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.FLOW_CONTROL_WINDOW;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.SAVE_HISTOGRAM; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.SAVE_HISTOGRAM;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.SERVER_PAYLOAD; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.SERVER_PAYLOAD;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.STREAM_WINDOW;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TARGET_QPS; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TARGET_QPS;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TESTCA; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TESTCA;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TLS; import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TLS;
@ -86,7 +85,7 @@ public class OpenLoopClient {
public static void main(String... args) throws Exception { public static void main(String... args) throws Exception {
ClientConfiguration.Builder configBuilder = ClientConfiguration.newBuilder( ClientConfiguration.Builder configBuilder = ClientConfiguration.newBuilder(
ADDRESS, TARGET_QPS, CLIENT_PAYLOAD, SERVER_PAYLOAD, TLS, ADDRESS, TARGET_QPS, CLIENT_PAYLOAD, SERVER_PAYLOAD, TLS,
TESTCA, TRANSPORT, DURATION, SAVE_HISTOGRAM, CONNECTION_WINDOW, STREAM_WINDOW); TESTCA, TRANSPORT, DURATION, SAVE_HISTOGRAM, FLOW_CONTROL_WINDOW);
ClientConfiguration config; ClientConfiguration config;
try { try {
config = configBuilder.build(args); config = configBuilder.build(args);

View File

@ -57,8 +57,7 @@ class ServerConfiguration implements Configuration {
boolean tls; boolean tls;
boolean directExecutor; boolean directExecutor;
SocketAddress address; SocketAddress address;
int connectionWindow = NettyChannelBuilder.DEFAULT_CONNECTION_WINDOW_SIZE; int flowControlWindow = NettyChannelBuilder.DEFAULT_FLOW_CONTROL_WINDOW;
int streamWindow = NettyChannelBuilder.DEFAULT_STREAM_WINDOW_SIZE;
private ServerConfiguration() { private ServerConfiguration() {
} }
@ -186,18 +185,11 @@ class ServerConfiguration implements Configuration {
config.directExecutor = parseBoolean(value); config.directExecutor = parseBoolean(value);
} }
}, },
CONNECTION_WINDOW("BYTES", "The HTTP/2 connection flow control window.", FLOW_CONTROL_WINDOW("BYTES", "The HTTP/2 flow control window.",
"" + DEFAULT.connectionWindow) { "" + DEFAULT.flowControlWindow) {
@Override @Override
protected void setServerValue(ServerConfiguration config, String value) { protected void setServerValue(ServerConfiguration config, String value) {
config.connectionWindow = parseInt(value); config.flowControlWindow = parseInt(value);
}
},
STREAM_WINDOW("BYTES", "The HTTP/2 per-stream flow control window.",
"" + DEFAULT.streamWindow) {
@Override
protected void setServerValue(ServerConfiguration config, String value) {
config.streamWindow = parseInt(value);
} }
}; };

View File

@ -176,8 +176,7 @@ final class Utils {
.negotiationType(negotiationType) .negotiationType(negotiationType)
.executor(config.directExecutor ? MoreExecutors.newDirectExecutorService() : null) .executor(config.directExecutor ? MoreExecutors.newDirectExecutorService() : null)
.sslContext(context) .sslContext(context)
.connectionWindowSize(config.connectionWindow) .flowControlWindow(config.flowControlWindow)
.streamWindowSize(config.streamWindow)
.build(); .build();
} }

View File

@ -37,10 +37,10 @@ import io.grpc.AbstractChannelBuilder;
import io.grpc.SharedResourceHolder; import io.grpc.SharedResourceHolder;
import io.grpc.transport.ClientTransport; import io.grpc.transport.ClientTransport;
import io.grpc.transport.ClientTransportFactory; import io.grpc.transport.ClientTransportFactory;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContext;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -52,16 +52,14 @@ import javax.net.ssl.SSLException;
* A builder to help simplify construction of channels using the Netty transport. * A builder to help simplify construction of channels using the Netty transport.
*/ */
public final class NettyChannelBuilder extends AbstractChannelBuilder<NettyChannelBuilder> { public final class NettyChannelBuilder extends AbstractChannelBuilder<NettyChannelBuilder> {
public static final int DEFAULT_CONNECTION_WINDOW_SIZE = 1048576; // 1MiB public static final int DEFAULT_FLOW_CONTROL_WINDOW = 1048576; // 1MiB
public static final int DEFAULT_STREAM_WINDOW_SIZE = Http2CodecUtil.DEFAULT_WINDOW_SIZE;
private final SocketAddress serverAddress; private final SocketAddress serverAddress;
private NegotiationType negotiationType = NegotiationType.TLS; private NegotiationType negotiationType = NegotiationType.TLS;
private Class<? extends Channel> channelType = NioSocketChannel.class; private Class<? extends Channel> channelType = NioSocketChannel.class;
private EventLoopGroup userEventLoopGroup; private EventLoopGroup userEventLoopGroup;
private SslContext sslContext; private SslContext sslContext;
private int connectionWindowSize = DEFAULT_CONNECTION_WINDOW_SIZE; private int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW;
private int streamWindowSize = DEFAULT_STREAM_WINDOW_SIZE;
/** /**
* Creates a new builder with the given server address. * Creates a new builder with the given server address.
@ -123,22 +121,12 @@ public final class NettyChannelBuilder extends AbstractChannelBuilder<NettyChann
} }
/** /**
* Sets the HTTP/2 connection flow control window. If not called, the default value * Sets the flow control window in bytes. If not called, the default value
* is {@link #DEFAULT_CONNECTION_WINDOW_SIZE}). * is {@link #DEFAULT_FLOW_CONTROL_WINDOW}).
*/ */
public NettyChannelBuilder connectionWindowSize(int connectionWindowSize) { public NettyChannelBuilder flowControlWindow(int flowControlWindow) {
Preconditions.checkArgument(connectionWindowSize > 0, "connectionWindowSize must be positive"); Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive");
this.connectionWindowSize = connectionWindowSize; this.flowControlWindow = flowControlWindow;
return this;
}
/**
* Sets the HTTP/2 per-stream flow control window. If not called, the default value
* is {@link #DEFAULT_STREAM_WINDOW_SIZE}).
*/
public NettyChannelBuilder streamWindowSize(int streamWindowSize) {
Preconditions.checkArgument(streamWindowSize > 0, "streamWindowSize must be positive");
this.streamWindowSize = streamWindowSize;
return this; return this;
} }
@ -148,8 +136,7 @@ public final class NettyChannelBuilder extends AbstractChannelBuilder<NettyChann
? SharedResourceHolder.get(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP) : userEventLoopGroup; ? SharedResourceHolder.get(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP) : userEventLoopGroup;
final NegotiationType negotiationType = this.negotiationType; final NegotiationType negotiationType = this.negotiationType;
final Class<? extends Channel> channelType = this.channelType; final Class<? extends Channel> channelType = this.channelType;
final int connectionWindowSize = this.connectionWindowSize; final int flowControlWindow = this.flowControlWindow;
final int streamWindowSize = this.streamWindowSize;
final ProtocolNegotiator negotiator; final ProtocolNegotiator negotiator;
switch (negotiationType) { switch (negotiationType) {
case PLAINTEXT: case PLAINTEXT:
@ -179,7 +166,7 @@ public final class NettyChannelBuilder extends AbstractChannelBuilder<NettyChann
@Override @Override
public ClientTransport newClientTransport() { public ClientTransport newClientTransport() {
return new NettyClientTransport(serverAddress, channelType, group, return new NettyClientTransport(serverAddress, channelType, group,
negotiator, connectionWindowSize, streamWindowSize); negotiator, flowControlWindow);
} }
}; };
Runnable terminationRunnable = null; Runnable terminationRunnable = null;

View File

@ -80,7 +80,7 @@ class NettyClientHandler extends Http2ConnectionHandler {
private final Ticker ticker; private final Ticker ticker;
private final Random random = new Random(); private final Random random = new Random();
private WriteQueue clientWriteQueue; private WriteQueue clientWriteQueue;
private int connectionWindowSize; private int flowControlWindow;
private Http2Settings initialSettings = new Http2Settings(); private Http2Settings initialSettings = new Http2Settings();
private Throwable connectionError; private Throwable connectionError;
private Http2Ping ping; private Http2Ping ping;
@ -90,19 +90,19 @@ class NettyClientHandler extends Http2ConnectionHandler {
public NettyClientHandler(BufferingHttp2ConnectionEncoder encoder, Http2Connection connection, public NettyClientHandler(BufferingHttp2ConnectionEncoder encoder, Http2Connection connection,
Http2FrameReader frameReader, Http2FrameReader frameReader,
int connectionWindowSize, int streamWindowSize) { int flowControlWindow) {
this(encoder, connection, frameReader, connectionWindowSize, streamWindowSize, this(encoder, connection, frameReader, flowControlWindow,
Ticker.systemTicker()); Ticker.systemTicker());
} }
@VisibleForTesting @VisibleForTesting
NettyClientHandler(BufferingHttp2ConnectionEncoder encoder, Http2Connection connection, NettyClientHandler(BufferingHttp2ConnectionEncoder encoder, Http2Connection connection,
Http2FrameReader frameReader, int connectionWindowSize, int streamWindowSize, Ticker ticker) { Http2FrameReader frameReader, int flowControlWindow, Ticker ticker) {
super(new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader, super(new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader,
new LazyFrameListener()), encoder); new LazyFrameListener()), encoder);
this.ticker = ticker; this.ticker = ticker;
Preconditions.checkArgument(connectionWindowSize > 0, "connectionWindowSize must be positive"); Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive");
this.connectionWindowSize = connectionWindowSize; this.flowControlWindow = flowControlWindow;
initListener(); initListener();
@ -121,7 +121,7 @@ class NettyClientHandler extends Http2ConnectionHandler {
// frame. Once we upgrade to Netty 4.1.Beta6 we'll be able to pass in the initial SETTINGS // frame. Once we upgrade to Netty 4.1.Beta6 we'll be able to pass in the initial SETTINGS
// to the super class constructor. // to the super class constructor.
initialSettings.pushEnabled(false); initialSettings.pushEnabled(false);
initialSettings.initialWindowSize(streamWindowSize); initialSettings.initialWindowSize(flowControlWindow);
initialSettings.maxConcurrentStreams(0); initialSettings.maxConcurrentStreams(0);
} }
@ -261,7 +261,7 @@ class NettyClientHandler extends Http2ConnectionHandler {
Http2Stream stream = connection().stream(http2Ex.streamId()); Http2Stream stream = connection().stream(http2Ex.streamId());
if (stream != null) { if (stream != null) {
clientStream(stream).transportReportStatus(Status.fromThrowable(cause), false, clientStream(stream).transportReportStatus(Status.fromThrowable(cause), false,
new Metadata.Trailers()); new Metadata.Trailers());
} }
// Delegate to the base class to send a RST_STREAM. // Delegate to the base class to send a RST_STREAM.
@ -480,13 +480,13 @@ class NettyClientHandler extends Http2ConnectionHandler {
} }
// Send the initial connection window if different than the default. // Send the initial connection window if different than the default.
if (connectionWindowSize > 0) { if (flowControlWindow > 0) {
needToFlush = true; needToFlush = true;
Http2Stream connectionStream = connection().connectionStream(); Http2Stream connectionStream = connection().connectionStream();
int currentSize = connection().local().flowController().windowSize(connectionStream); int currentSize = connection().local().flowController().windowSize(connectionStream);
int delta = connectionWindowSize - currentSize; int delta = flowControlWindow - currentSize;
decoder().flowController().incrementWindowSize(ctx, connectionStream, delta); decoder().flowController().incrementWindowSize(ctx, connectionStream, delta);
connectionWindowSize = -1; flowControlWindow = -1;
} }
if (needToFlush) { if (needToFlush) {

View File

@ -82,8 +82,7 @@ class NettyClientTransport implements ClientTransport {
private final ProtocolNegotiator.Handler negotiationHandler; private final ProtocolNegotiator.Handler negotiationHandler;
private final NettyClientHandler handler; private final NettyClientHandler handler;
private final AsciiString authority; private final AsciiString authority;
private final int connectionWindowSize; private final int flowControlWindow;
private final int streamWindowSize;
// We should not send on the channel until negotiation completes. This is a hard requirement // We should not send on the channel until negotiation completes. This is a hard requirement
// by SslHandler but is appropriate for HTTP/1.1 Upgrade as well. // by SslHandler but is appropriate for HTTP/1.1 Upgrade as well.
private Channel channel; private Channel channel;
@ -97,13 +96,12 @@ class NettyClientTransport implements ClientTransport {
NettyClientTransport(SocketAddress address, Class<? extends Channel> channelType, NettyClientTransport(SocketAddress address, Class<? extends Channel> channelType,
EventLoopGroup group, ProtocolNegotiator negotiator, EventLoopGroup group, ProtocolNegotiator negotiator,
int connectionWindowSize, int streamWindowSize) { int flowControlWindow) {
Preconditions.checkNotNull(negotiator, "negotiator"); Preconditions.checkNotNull(negotiator, "negotiator");
this.address = Preconditions.checkNotNull(address, "address"); this.address = Preconditions.checkNotNull(address, "address");
this.group = Preconditions.checkNotNull(group, "group"); this.group = Preconditions.checkNotNull(group, "group");
this.channelType = Preconditions.checkNotNull(channelType, "channelType"); this.channelType = Preconditions.checkNotNull(channelType, "channelType");
this.connectionWindowSize = connectionWindowSize; this.flowControlWindow = flowControlWindow;
this.streamWindowSize = streamWindowSize;
if (address instanceof InetSocketAddress) { if (address instanceof InetSocketAddress) {
InetSocketAddress inetAddress = (InetSocketAddress) address; InetSocketAddress inetAddress = (InetSocketAddress) address;
@ -232,7 +230,6 @@ class NettyClientTransport implements ClientTransport {
BufferingHttp2ConnectionEncoder encoder = new BufferingHttp2ConnectionEncoder( BufferingHttp2ConnectionEncoder encoder = new BufferingHttp2ConnectionEncoder(
new DefaultHttp2ConnectionEncoder(connection, frameWriter)); new DefaultHttp2ConnectionEncoder(connection, frameWriter));
return new NettyClientHandler(encoder, connection, frameReader, connectionWindowSize, return new NettyClientHandler(encoder, connection, frameReader, flowControlWindow);
streamWindowSize);
} }
} }

View File

@ -69,27 +69,25 @@ public class NettyServer implements Server {
private final int maxStreamsPerConnection; private final int maxStreamsPerConnection;
private ServerListener listener; private ServerListener listener;
private Channel channel; private Channel channel;
private int connectionWindowSize; private int flowControlWindow;
private int streamWindowSize;
NettyServer(SocketAddress address, Class<? extends ServerChannel> channelType, NettyServer(SocketAddress address, Class<? extends ServerChannel> channelType,
EventLoopGroup bossGroup, EventLoopGroup workerGroup, int maxStreamsPerConnection, EventLoopGroup bossGroup, EventLoopGroup workerGroup, int maxStreamsPerConnection,
int connectionWindowSize, int streamWindowSize) { int flowControlWindow) {
this(address, channelType, bossGroup, workerGroup, null, maxStreamsPerConnection, this(address, channelType, bossGroup, workerGroup, null, maxStreamsPerConnection,
connectionWindowSize, streamWindowSize); flowControlWindow);
} }
NettyServer(SocketAddress address, Class<? extends ServerChannel> channelType, NettyServer(SocketAddress address, Class<? extends ServerChannel> channelType,
EventLoopGroup bossGroup, EventLoopGroup workerGroup, @Nullable SslContext sslContext, EventLoopGroup bossGroup, EventLoopGroup workerGroup, @Nullable SslContext sslContext,
int maxStreamsPerConnection, int connectionWindowSize, int streamWindowSize) { int maxStreamsPerConnection, int flowControlWindow) {
this.address = address; this.address = address;
this.channelType = Preconditions.checkNotNull(channelType, "channelType"); this.channelType = Preconditions.checkNotNull(channelType, "channelType");
this.bossGroup = Preconditions.checkNotNull(bossGroup, "bossGroup"); this.bossGroup = Preconditions.checkNotNull(bossGroup, "bossGroup");
this.workerGroup = Preconditions.checkNotNull(workerGroup, "workerGroup"); this.workerGroup = Preconditions.checkNotNull(workerGroup, "workerGroup");
this.sslContext = sslContext; this.sslContext = sslContext;
this.maxStreamsPerConnection = maxStreamsPerConnection; this.maxStreamsPerConnection = maxStreamsPerConnection;
this.connectionWindowSize = connectionWindowSize; this.flowControlWindow = flowControlWindow;
this.streamWindowSize = streamWindowSize;
} }
@Override @Override
@ -106,8 +104,7 @@ public class NettyServer implements Server {
@Override @Override
public void initChannel(Channel ch) throws Exception { public void initChannel(Channel ch) throws Exception {
NettyServerTransport transport NettyServerTransport transport
= new NettyServerTransport(ch, sslContext, maxStreamsPerConnection, = new NettyServerTransport(ch, sslContext, maxStreamsPerConnection, flowControlWindow);
connectionWindowSize, streamWindowSize);
transport.start(listener.transportCreated(transport)); transport.start(listener.transportCreated(transport));
} }
}); });

View File

@ -36,10 +36,10 @@ import com.google.common.base.Preconditions;
import io.grpc.AbstractServerBuilder; import io.grpc.AbstractServerBuilder;
import io.grpc.HandlerRegistry; import io.grpc.HandlerRegistry;
import io.grpc.SharedResourceHolder; import io.grpc.SharedResourceHolder;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel; import io.netty.channel.ServerChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContext;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -49,8 +49,7 @@ import java.net.SocketAddress;
* A builder to help simplify the construction of a Netty-based GRPC server. * A builder to help simplify the construction of a Netty-based GRPC server.
*/ */
public final class NettyServerBuilder extends AbstractServerBuilder<NettyServerBuilder> { public final class NettyServerBuilder extends AbstractServerBuilder<NettyServerBuilder> {
public static final int DEFAULT_CONNECTION_WINDOW_SIZE = 1048576; // 1MiB public static final int DEFAULT_FLOW_CONTROL_WINDOW = 1048576; // 1MiB
public static final int DEFAULT_STREAM_WINDOW_SIZE = Http2CodecUtil.DEFAULT_WINDOW_SIZE;
private final SocketAddress address; private final SocketAddress address;
private Class<? extends ServerChannel> channelType = NioServerSocketChannel.class; private Class<? extends ServerChannel> channelType = NioServerSocketChannel.class;
@ -58,8 +57,7 @@ public final class NettyServerBuilder extends AbstractServerBuilder<NettyServerB
private EventLoopGroup userWorkerEventLoopGroup; private EventLoopGroup userWorkerEventLoopGroup;
private SslContext sslContext; private SslContext sslContext;
private int maxConcurrentCallsPerConnection = Integer.MAX_VALUE; private int maxConcurrentCallsPerConnection = Integer.MAX_VALUE;
private int connectionWindowSize = DEFAULT_CONNECTION_WINDOW_SIZE; private int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW;
private int streamWindowSize = DEFAULT_STREAM_WINDOW_SIZE;
/** /**
* Creates a server builder that will bind to the given port. * Creates a server builder that will bind to the given port.
@ -180,22 +178,12 @@ public final class NettyServerBuilder extends AbstractServerBuilder<NettyServerB
} }
/** /**
* Sets the HTTP/2 connection flow control window. If not called, the default value * Sets the HTTP/2 flow control window. If not called, the default value
* is {@link #DEFAULT_CONNECTION_WINDOW_SIZE}). * is {@link #DEFAULT_FLOW_CONTROL_WINDOW}).
*/ */
public NettyServerBuilder connectionWindowSize(int connectionWindowSize) { public NettyServerBuilder flowControlWindow(int flowControlWindow) {
Preconditions.checkArgument(connectionWindowSize > 0, "connectionWindowSize must be positive"); Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive");
this.connectionWindowSize = connectionWindowSize; this.flowControlWindow = flowControlWindow;
return this;
}
/**
* Sets the HTTP/2 per-stream flow control window. If not called, the default value
* is {@link #DEFAULT_STREAM_WINDOW_SIZE}).
*/
public NettyServerBuilder streamWindowSize(int streamWindowSize) {
Preconditions.checkArgument(streamWindowSize > 0, "streamWindowSize must be positive");
this.streamWindowSize = streamWindowSize;
return this; return this;
} }
@ -207,8 +195,7 @@ public final class NettyServerBuilder extends AbstractServerBuilder<NettyServerB
? SharedResourceHolder.get(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP) ? SharedResourceHolder.get(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP)
: userWorkerEventLoopGroup; : userWorkerEventLoopGroup;
NettyServer server = new NettyServer(address, channelType, bossEventLoopGroup, NettyServer server = new NettyServer(address, channelType, bossEventLoopGroup,
workerEventLoopGroup, sslContext, maxConcurrentCallsPerConnection, connectionWindowSize, workerEventLoopGroup, sslContext, maxConcurrentCallsPerConnection, flowControlWindow);
streamWindowSize);
Runnable terminationRunnable = new Runnable() { Runnable terminationRunnable = new Runnable() {
@Override @Override
public void run() { public void run() {

View File

@ -83,7 +83,7 @@ class NettyServerHandler extends Http2ConnectionHandler {
private Throwable connectionError; private Throwable connectionError;
private ChannelHandlerContext ctx; private ChannelHandlerContext ctx;
private boolean teWarningLogged; private boolean teWarningLogged;
private int connectionWindowSize; private int flowControlWindow;
private Http2Settings initialSettings = new Http2Settings(); private Http2Settings initialSettings = new Http2Settings();
private WriteQueue serverWriteQueue; private WriteQueue serverWriteQueue;
@ -92,11 +92,10 @@ class NettyServerHandler extends Http2ConnectionHandler {
Http2FrameReader frameReader, Http2FrameReader frameReader,
Http2FrameWriter frameWriter, Http2FrameWriter frameWriter,
int maxStreams, int maxStreams,
int connectionWindowSize, int flowControlWindow) {
int streamWindowSize) {
super(connection, frameReader, frameWriter, new LazyFrameListener()); super(connection, frameReader, frameWriter, new LazyFrameListener());
Preconditions.checkArgument(connectionWindowSize > 0, "connectionWindowSize must be positive"); Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive");
this.connectionWindowSize = connectionWindowSize; this.flowControlWindow = flowControlWindow;
streamKey = connection.newKey(); streamKey = connection.newKey();
this.transportListener = Preconditions.checkNotNull(transportListener, "transportListener"); this.transportListener = Preconditions.checkNotNull(transportListener, "transportListener");
@ -105,7 +104,7 @@ class NettyServerHandler extends Http2ConnectionHandler {
// TODO(nmittler): this is a temporary hack as we currently have to send a 2nd SETTINGS // TODO(nmittler): this is a temporary hack as we currently have to send a 2nd SETTINGS
// frame. Once we upgrade to Netty 4.1.Beta6 we'll be able to pass in the initial SETTINGS // frame. Once we upgrade to Netty 4.1.Beta6 we'll be able to pass in the initial SETTINGS
// to the super class constructor. // to the super class constructor.
initialSettings.initialWindowSize(streamWindowSize); initialSettings.initialWindowSize(flowControlWindow);
initialSettings.maxConcurrentStreams(maxStreams); initialSettings.maxConcurrentStreams(maxStreams);
} }
@ -351,13 +350,13 @@ class NettyServerHandler extends Http2ConnectionHandler {
} }
// Send the initial connection window if different than the default. // Send the initial connection window if different than the default.
if (connectionWindowSize > 0) { if (flowControlWindow > 0) {
needToFlush = true; needToFlush = true;
Http2Stream connectionStream = connection().connectionStream(); Http2Stream connectionStream = connection().connectionStream();
int currentSize = connection().local().flowController().windowSize(connectionStream); int currentSize = connection().local().flowController().windowSize(connectionStream);
int delta = connectionWindowSize - currentSize; int delta = flowControlWindow - currentSize;
decoder().flowController().incrementWindowSize(ctx, connectionStream, delta); decoder().flowController().incrementWindowSize(ctx, connectionStream, delta);
connectionWindowSize = -1; flowControlWindow = -1;
} }
if (needToFlush) { if (needToFlush) {

View File

@ -67,16 +67,14 @@ class NettyServerTransport implements ServerTransport {
private final int maxStreams; private final int maxStreams;
private ServerTransportListener listener; private ServerTransportListener listener;
private boolean terminated; private boolean terminated;
private int connectionWindowSize; private int flowControlWindow;
private int streamWindowSize;
NettyServerTransport(Channel channel, @Nullable SslContext sslContext, int maxStreams, NettyServerTransport(Channel channel, @Nullable SslContext sslContext, int maxStreams,
int connectionWindowSize, int streamWindowSize) { int flowControlWindow) {
this.channel = Preconditions.checkNotNull(channel, "channel"); this.channel = Preconditions.checkNotNull(channel, "channel");
this.sslContext = sslContext; this.sslContext = sslContext;
this.maxStreams = maxStreams; this.maxStreams = maxStreams;
this.connectionWindowSize = connectionWindowSize; this.flowControlWindow = flowControlWindow;
this.streamWindowSize = streamWindowSize;
} }
public void start(ServerTransportListener listener) { public void start(ServerTransportListener listener) {
@ -137,6 +135,6 @@ class NettyServerTransport implements ServerTransport {
new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger); new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger);
return new NettyServerHandler(transportListener, connection, frameReader, frameWriter, return new NettyServerHandler(transportListener, connection, frameReader, frameWriter,
maxStreams, connectionWindowSize, streamWindowSize); maxStreams, flowControlWindow);
} }
} }

View File

@ -67,6 +67,7 @@ import io.grpc.Status.Code;
import io.grpc.StatusException; import io.grpc.StatusException;
import io.grpc.transport.ClientTransport; import io.grpc.transport.ClientTransport;
import io.grpc.transport.ClientTransport.PingCallback; import io.grpc.transport.ClientTransport.PingCallback;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -129,7 +130,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase {
frameWriter = new DefaultHttp2FrameWriter(); frameWriter = new DefaultHttp2FrameWriter();
frameReader = new DefaultHttp2FrameReader(); frameReader = new DefaultHttp2FrameReader();
handler = newHandler(DEFAULT_WINDOW_SIZE, DEFAULT_WINDOW_SIZE); handler = newHandler(DEFAULT_WINDOW_SIZE);
content = Unpooled.copiedBuffer("hello world", UTF_8); content = Unpooled.copiedBuffer("hello world", UTF_8);
when(channel.isActive()).thenReturn(true); when(channel.isActive()).thenReturn(true);
@ -314,7 +315,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase {
assertTrue(promise.isDone()); assertTrue(promise.isDone());
assertFalse(promise.isSuccess()); assertFalse(promise.isSuccess());
verify(stream).transportReportStatus(any(Status.class), eq(false), verify(stream).transportReportStatus(any(Status.class), eq(false),
notNull(Metadata.Trailers.class)); notNull(Metadata.Trailers.class));
} }
@Test @Test
@ -323,13 +324,13 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase {
// Read a GOAWAY that indicates our stream was never processed by the server. // Read a GOAWAY that indicates our stream was never processed by the server.
handler.channelRead(ctx, handler.channelRead(ctx,
goAwayFrame(0, 8 /* Cancel */, Unpooled.copiedBuffer("this is a test", UTF_8))); goAwayFrame(0, 8 /* Cancel */, Unpooled.copiedBuffer("this is a test", UTF_8)));
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class); ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(stream).transportReportStatus(captor.capture(), eq(false), verify(stream).transportReportStatus(captor.capture(), eq(false),
notNull(Metadata.Trailers.class)); notNull(Metadata.Trailers.class));
assertEquals(Status.CANCELLED.getCode(), captor.getValue().getCode()); assertEquals(Status.CANCELLED.getCode(), captor.getValue().getCode());
assertEquals("HTTP/2 error code: CANCEL\nthis is a test", assertEquals("HTTP/2 error code: CANCEL\nthis is a test",
captor.getValue().getDescription()); captor.getValue().getDescription());
} }
@Test @Test
@ -358,7 +359,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase {
when(stream.id()).thenReturn(3); when(stream.id()).thenReturn(3);
writeQueue.enqueue(new CancelStreamCommand(stream), true); writeQueue.enqueue(new CancelStreamCommand(stream), true);
verify(stream).transportReportStatus(eq(Status.CANCELLED), eq(true), verify(stream).transportReportStatus(eq(Status.CANCELLED), eq(true),
any(Metadata.Trailers.class)); any(Metadata.Trailers.class));
} }
@Test @Test
@ -379,14 +380,14 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase {
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class); ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
InOrder inOrder = inOrder(stream); InOrder inOrder = inOrder(stream);
inOrder.verify(stream, calls(1)).transportReportStatus(captor.capture(), eq(false), inOrder.verify(stream, calls(1)).transportReportStatus(captor.capture(), eq(false),
notNull(Metadata.Trailers.class)); notNull(Metadata.Trailers.class));
assertEquals(Status.UNAVAILABLE.getCode(), captor.getValue().getCode()); assertEquals(Status.UNAVAILABLE.getCode(), captor.getValue().getCode());
} }
@Test @Test
public void connectionWindowShouldBeOverridden() throws Exception { public void connectionWindowShouldBeOverridden() throws Exception {
int connectionWindow = 1048576; // 1MiB int connectionWindow = 1048576; // 1MiB
handler = newHandler(connectionWindow, DEFAULT_WINDOW_SIZE); handler = newHandler(connectionWindow);
handler.handlerAdded(ctx); handler.handlerAdded(ctx);
Http2Stream connectionStream = handler.connection().connectionStream(); Http2Stream connectionStream = handler.connection().connectionStream();
Http2FlowController localFlowController = handler.connection().local().flowController(); Http2FlowController localFlowController = handler.connection().local().flowController();
@ -534,7 +535,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase {
mockContext(); mockContext();
} }
private NettyClientHandler newHandler(int connectionWindowSize, int streamWindowSize) { private NettyClientHandler newHandler(int connectionWindowSize) {
Http2Connection connection = new DefaultHttp2Connection(false); Http2Connection connection = new DefaultHttp2Connection(false);
Http2FrameReader frameReader = new DefaultHttp2FrameReader(); Http2FrameReader frameReader = new DefaultHttp2FrameReader();
Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter(); Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter();
@ -546,8 +547,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase {
return nanoTime; return nanoTime;
} }
}; };
return new NettyClientHandler(encoder, connection, frameReader, connectionWindowSize, return new NettyClientHandler(encoder, connection, frameReader, connectionWindowSize, ticker);
streamWindowSize, ticker);
} }
private AsciiString as(String string) { private AsciiString as(String string) {

View File

@ -182,7 +182,7 @@ public class NettyClientTransportTest {
private NettyClientTransport newTransport(ProtocolNegotiator negotiator) { private NettyClientTransport newTransport(ProtocolNegotiator negotiator) {
NettyClientTransport transport = new NettyClientTransport(address, NioSocketChannel.class, NettyClientTransport transport = new NettyClientTransport(address, NioSocketChannel.class,
group, negotiator, DEFAULT_WINDOW_SIZE, DEFAULT_WINDOW_SIZE); group, negotiator, DEFAULT_WINDOW_SIZE);
transports.add(transport); transports.add(transport);
return transport; return transport;
} }
@ -198,7 +198,7 @@ public class NettyClientTransportTest {
.ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE).build(); .ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE).build();
server = new NettyServer(address, NioServerSocketChannel.class, server = new NettyServer(address, NioServerSocketChannel.class,
group, group, serverContext, maxStreamsPerConnection, group, group, serverContext, maxStreamsPerConnection,
DEFAULT_WINDOW_SIZE, DEFAULT_WINDOW_SIZE); DEFAULT_WINDOW_SIZE);
server.start(serverListener); server.start(serverListener);
} }

View File

@ -301,7 +301,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase {
Http2Connection connection = new DefaultHttp2Connection(true); Http2Connection connection = new DefaultHttp2Connection(true);
handler = handler =
new NettyServerHandler(transportListener, connection, new DefaultHttp2FrameReader(), new NettyServerHandler(transportListener, connection, new DefaultHttp2FrameReader(),
frameWriter, maxConcurrentStreams, DEFAULT_WINDOW_SIZE, DEFAULT_WINDOW_SIZE); frameWriter, maxConcurrentStreams, DEFAULT_WINDOW_SIZE);
when(channel.isActive()).thenReturn(true); when(channel.isActive()).thenReturn(true);
mockContext(); mockContext();
@ -324,15 +324,15 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase {
@Test @Test
public void connectionWindowShouldBeOverridden() throws Exception { public void connectionWindowShouldBeOverridden() throws Exception {
int connectionWindow = 1048576; // 1MiB int flowControlWindow = 1048576; // 1MiB
handler = newHandler(transportListener, connectionWindow, DEFAULT_WINDOW_SIZE); handler = newHandler(transportListener, flowControlWindow);
handler.handlerAdded(ctx); handler.handlerAdded(ctx);
Http2Stream connectionStream = handler.connection().connectionStream(); Http2Stream connectionStream = handler.connection().connectionStream();
Http2FlowController localFlowController = handler.connection().local().flowController(); Http2FlowController localFlowController = handler.connection().local().flowController();
int actualInitialWindowSize = localFlowController.initialWindowSize(connectionStream); int actualInitialWindowSize = localFlowController.initialWindowSize(connectionStream);
int actualWindowSize = localFlowController.windowSize(connectionStream); int actualWindowSize = localFlowController.windowSize(connectionStream);
assertEquals(connectionWindow, actualWindowSize); assertEquals(flowControlWindow, actualWindowSize);
assertEquals(connectionWindow, actualInitialWindowSize); assertEquals(flowControlWindow, actualInitialWindowSize);
} }
private void createStream() throws Exception { private void createStream() throws Exception {
@ -388,16 +388,15 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase {
} }
private static NettyServerHandler newHandler(ServerTransportListener transportListener, private static NettyServerHandler newHandler(ServerTransportListener transportListener,
int connectionWindowSize, int flowControlWindow) {
int streamWindowSize) {
Http2Connection connection = new DefaultHttp2Connection(true); Http2Connection connection = new DefaultHttp2Connection(true);
Http2FrameReader frameReader = new DefaultHttp2FrameReader(); Http2FrameReader frameReader = new DefaultHttp2FrameReader();
Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter(); Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter();
return new NettyServerHandler(transportListener, connection, frameReader, frameWriter, return new NettyServerHandler(transportListener, connection, frameReader, frameWriter,
Integer.MAX_VALUE, connectionWindowSize, streamWindowSize); Integer.MAX_VALUE, flowControlWindow);
} }
private static NettyServerHandler newHandler(ServerTransportListener transportListener) { private static NettyServerHandler newHandler(ServerTransportListener transportListener) {
return newHandler(transportListener, DEFAULT_WINDOW_SIZE, DEFAULT_WINDOW_SIZE); return newHandler(transportListener, DEFAULT_WINDOW_SIZE);
} }
} }

View File

@ -36,8 +36,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.squareup.okhttp.CipherSuite; import com.squareup.okhttp.CipherSuite;
import com.squareup.okhttp.ConnectionSpec; import com.squareup.okhttp.ConnectionSpec;
import com.squareup.okhttp.TlsVersion; import com.squareup.okhttp.TlsVersion;
import io.grpc.AbstractChannelBuilder; import io.grpc.AbstractChannelBuilder;
import io.grpc.SharedResourceHolder; import io.grpc.SharedResourceHolder;
import io.grpc.SharedResourceHolder.Resource; import io.grpc.SharedResourceHolder.Resource;