netty: allow to use bandwidth delay product (#6979)

This commit is contained in:
Jihun Cho 2020-05-01 15:39:22 -07:00 committed by GitHub
parent 50a829ad9d
commit 83a3b25e80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 365 additions and 200 deletions

View File

@ -1907,7 +1907,7 @@ public abstract class AbstractInteropTest {
* Some tests run on memory constrained environments. Rather than OOM, just give up. 64 is * Some tests run on memory constrained environments. Rather than OOM, just give up. 64 is
* chosen as a maximum amount of memory a large test would need. * chosen as a maximum amount of memory a large test would need.
*/ */
private static void assumeEnoughMemory() { protected static void assumeEnoughMemory() {
Runtime r = Runtime.getRuntime(); Runtime r = Runtime.getRuntime();
long usedMem = r.totalMemory() - r.freeMemory(); long usedMem = r.totalMemory() - r.freeMemory();
long actuallyFreeMemory = r.maxMemory() - usedMem; long actuallyFreeMemory = r.maxMemory() - usedMem;

View File

@ -18,23 +18,15 @@ package io.grpc.testing.integration;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.internal.AbstractServerImplBuilder; import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.netty.InternalHandlerSettings;
import io.grpc.netty.NegotiationType; import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder; import io.grpc.netty.NettyServerBuilder;
import org.junit.BeforeClass;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.JUnit4; import org.junit.runners.JUnit4;
@RunWith(JUnit4.class) @RunWith(JUnit4.class)
public class AutoWindowSizingOnTest extends AbstractInteropTest { public class AutoWindowSizingOnTest extends AbstractInteropTest {
@BeforeClass
public static void turnOnAutoWindow() {
InternalHandlerSettings.enable(true);
InternalHandlerSettings.autoWindowOn(true);
}
@Override @Override
protected AbstractServerImplBuilder<?> getServerBuilder() { protected AbstractServerImplBuilder<?> getServerBuilder() {
return NettyServerBuilder.forPort(0) return NettyServerBuilder.forPort(0)
@ -45,7 +37,8 @@ public class AutoWindowSizingOnTest extends AbstractInteropTest {
protected ManagedChannel createChannel() { protected ManagedChannel createChannel() {
NettyChannelBuilder builder = NettyChannelBuilder.forAddress(getListenAddress()) NettyChannelBuilder builder = NettyChannelBuilder.forAddress(getListenAddress())
.negotiationType(NegotiationType.PLAINTEXT) .negotiationType(NegotiationType.PLAINTEXT)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE); .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.initialFlowControlWindow(NettyChannelBuilder.DEFAULT_FLOW_CONTROL_WINDOW);
// Disable the default census stats interceptor, use testing interceptor instead. // Disable the default census stats interceptor, use testing interceptor instead.
io.grpc.internal.TestingAccessor.setStatsEnabled(builder, false); io.grpc.internal.TestingAccessor.setStatsEnabled(builder, false);
return builder.intercept(createCensusStatsClientInterceptor()).build(); return builder.intercept(createCensusStatsClientInterceptor()).build();

View File

@ -24,7 +24,11 @@ import io.grpc.Server;
import io.grpc.ServerBuilder; import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptor; import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors; import io.grpc.ServerInterceptors;
import io.grpc.netty.InternalHandlerSettings; import io.grpc.netty.GrpcHttp2ConnectionHandler;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.InternalNettyChannelBuilder.ProtocolNegotiatorFactory;
import io.grpc.netty.InternalProtocolNegotiator.ProtocolNegotiator;
import io.grpc.netty.InternalProtocolNegotiators;
import io.grpc.netty.NegotiationType; import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder; import io.grpc.netty.NettyServerBuilder;
@ -32,6 +36,9 @@ import io.grpc.stub.StreamObserver;
import io.grpc.testing.integration.Messages.ResponseParameters; import io.grpc.testing.integration.Messages.ResponseParameters;
import io.grpc.testing.integration.Messages.StreamingOutputCallRequest; import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
import io.grpc.testing.integration.Messages.StreamingOutputCallResponse; import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.util.AsciiString;
import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -40,10 +47,11 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.JUnit4; import org.junit.runners.JUnit4;
@ -63,9 +71,11 @@ public class NettyFlowControlTest {
private static final int REGULAR_WINDOW = 64 * 1024; private static final int REGULAR_WINDOW = 64 * 1024;
private static final int MAX_WINDOW = 8 * 1024 * 1024; private static final int MAX_WINDOW = 8 * 1024 * 1024;
private static ManagedChannel channel; private final CapturingProtocolNegotiationFactory capturingPnFactory
private static Server server; = new CapturingProtocolNegotiationFactory();
private static TrafficControlProxy proxy; private ManagedChannel channel;
private Server server;
private TrafficControlProxy proxy;
private int proxyPort; private int proxyPort;
private int serverPort; private int serverPort;
@ -74,11 +84,6 @@ public class NettyFlowControlTest {
new ThreadPoolExecutor(1, 10, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor(1, 10, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new DefaultThreadFactory("flowcontrol-test-pool", true)); new DefaultThreadFactory("flowcontrol-test-pool", true));
@BeforeClass
public static void setUp() {
InternalHandlerSettings.enable(true);
InternalHandlerSettings.autoWindowOn(true);
}
@AfterClass @AfterClass
public static void shutDownTests() { public static void shutDownTests() {
@ -93,8 +98,13 @@ public class NettyFlowControlTest {
@After @After
public void endTest() throws IOException { public void endTest() throws IOException {
proxy.shutDown(); if (proxy != null) {
server.shutdown(); proxy.shutDown();
}
server.shutdownNow();
if (channel != null) {
channel.shutdownNow();
}
} }
@Test @Test
@ -102,7 +112,7 @@ public class NettyFlowControlTest {
proxy = new TrafficControlProxy(serverPort, HIGH_BAND, MED_LAT, TimeUnit.MILLISECONDS); proxy = new TrafficControlProxy(serverPort, HIGH_BAND, MED_LAT, TimeUnit.MILLISECONDS);
proxy.start(); proxy.start();
proxyPort = proxy.getPort(); proxyPort = proxy.getPort();
resetConnection(REGULAR_WINDOW); createAndStartChannel(REGULAR_WINDOW);
doTest(HIGH_BAND, MED_LAT); doTest(HIGH_BAND, MED_LAT);
} }
@ -111,16 +121,17 @@ public class NettyFlowControlTest {
proxy = new TrafficControlProxy(serverPort, LOW_BAND, MED_LAT, TimeUnit.MILLISECONDS); proxy = new TrafficControlProxy(serverPort, LOW_BAND, MED_LAT, TimeUnit.MILLISECONDS);
proxy.start(); proxy.start();
proxyPort = proxy.getPort(); proxyPort = proxy.getPort();
resetConnection(REGULAR_WINDOW); createAndStartChannel(REGULAR_WINDOW);
doTest(LOW_BAND, MED_LAT); doTest(LOW_BAND, MED_LAT);
} }
@Test @Test
@Ignore("enable once 2 pings between data is no longer necessary")
public void verySmallWindowMakesProgress() throws InterruptedException, IOException { public void verySmallWindowMakesProgress() throws InterruptedException, IOException {
proxy = new TrafficControlProxy(serverPort, HIGH_BAND, MED_LAT, TimeUnit.MILLISECONDS); proxy = new TrafficControlProxy(serverPort, HIGH_BAND, MED_LAT, TimeUnit.MILLISECONDS);
proxy.start(); proxy.start();
proxyPort = proxy.getPort(); proxyPort = proxy.getPort();
resetConnection(TINY_WINDOW); createAndStartChannel(TINY_WINDOW);
doTest(HIGH_BAND, MED_LAT); doTest(HIGH_BAND, MED_LAT);
} }
@ -142,9 +153,10 @@ public class NettyFlowControlTest {
.addResponseParameters(ResponseParameters.newBuilder().setSize(streamSize / 2)); .addResponseParameters(ResponseParameters.newBuilder().setSize(streamSize / 2));
StreamingOutputCallRequest request = builder.build(); StreamingOutputCallRequest request = builder.build();
TestStreamObserver observer = new TestStreamObserver(expectedWindow); TestStreamObserver observer =
new TestStreamObserver(capturingPnFactory.grpcHandlerRef, expectedWindow);
stub.streamingOutputCall(request, observer); stub.streamingOutputCall(request, observer);
int lastWindow = observer.waitFor(); int lastWindow = observer.waitFor(5, TimeUnit.SECONDS);
// deal with cases that either don't cause a window update or hit max window // deal with cases that either don't cause a window update or hit max window
expectedWindow = Math.min(MAX_WINDOW, Math.max(expectedWindow, REGULAR_WINDOW)); expectedWindow = Math.min(MAX_WINDOW, Math.max(expectedWindow, REGULAR_WINDOW));
@ -160,24 +172,21 @@ public class NettyFlowControlTest {
/** /**
* Resets client/server and their flow control windows. * Resets client/server and their flow control windows.
*/ */
private void resetConnection(int clientFlowControlWindow) private void createAndStartChannel(int clientFlowControlWindow) {
throws InterruptedException { NettyChannelBuilder channelBuilder =
if (channel != null) { NettyChannelBuilder
if (!channel.isShutdown()) { .forAddress(new InetSocketAddress("localhost", proxyPort))
channel.shutdown(); .initialFlowControlWindow(clientFlowControlWindow)
channel.awaitTermination(100, TimeUnit.MILLISECONDS); .negotiationType(NegotiationType.PLAINTEXT);
} InternalNettyChannelBuilder.setProtocolNegotiatorFactory(channelBuilder, capturingPnFactory);
} channel = channelBuilder.build();
channel = NettyChannelBuilder.forAddress(new InetSocketAddress("localhost", proxyPort))
.flowControlWindow(clientFlowControlWindow)
.negotiationType(NegotiationType.PLAINTEXT)
.build();
} }
private void startServer(int serverFlowControlWindow) { private void startServer(int serverFlowControlWindow) {
ServerBuilder<?> builder = ServerBuilder<?> builder =
NettyServerBuilder.forAddress(new InetSocketAddress("localhost", 0)) NettyServerBuilder
.flowControlWindow(serverFlowControlWindow); .forAddress(new InetSocketAddress("localhost", 0))
.initialFlowControlWindow(serverFlowControlWindow);
builder.addService(ServerInterceptors.intercept( builder.addService(ServerInterceptors.intercept(
new TestServiceImpl(Executors.newScheduledThreadPool(2)), new TestServiceImpl(Executors.newScheduledThreadPool(2)),
ImmutableList.<ServerInterceptor>of())); ImmutableList.<ServerInterceptor>of()));
@ -193,20 +202,25 @@ public class NettyFlowControlTest {
*/ */
private static class TestStreamObserver implements StreamObserver<StreamingOutputCallResponse> { private static class TestStreamObserver implements StreamObserver<StreamingOutputCallResponse> {
long startRequestNanos; final AtomicReference<GrpcHttp2ConnectionHandler> grpcHandlerRef;
final long startRequestNanos;
long endRequestNanos; long endRequestNanos;
private final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
long expectedWindow; final long expectedWindow;
int lastWindow; int lastWindow;
public TestStreamObserver(long window) { public TestStreamObserver(
AtomicReference<GrpcHttp2ConnectionHandler> grpcHandlerRef, long window) {
this.grpcHandlerRef = grpcHandlerRef;
startRequestNanos = System.nanoTime(); startRequestNanos = System.nanoTime();
expectedWindow = window; expectedWindow = window;
} }
@Override @Override
public void onNext(StreamingOutputCallResponse value) { public void onNext(StreamingOutputCallResponse value) {
lastWindow = InternalHandlerSettings.getLatestClientWindow(); GrpcHttp2ConnectionHandler grpcHandler = grpcHandlerRef.get();
Http2Stream connectionStream = grpcHandler.connection().connectionStream();
lastWindow = grpcHandler.decoder().flowController().initialWindowSize(connectionStream);
if (lastWindow >= expectedWindow) { if (lastWindow >= expectedWindow) {
onCompleted(); onCompleted();
} }
@ -227,9 +241,40 @@ public class NettyFlowControlTest {
return endRequestNanos - startRequestNanos; return endRequestNanos - startRequestNanos;
} }
public int waitFor() throws InterruptedException { public int waitFor(long duration, TimeUnit unit) throws InterruptedException {
latch.await(); latch.await(duration, unit);
return lastWindow; return lastWindow;
} }
} }
private static class CapturingProtocolNegotiationFactory implements ProtocolNegotiatorFactory {
AtomicReference<GrpcHttp2ConnectionHandler> grpcHandlerRef = new AtomicReference<>();
@Override
public ProtocolNegotiator buildProtocolNegotiator() {
return new CapturingProtocolNegotiator();
}
private class CapturingProtocolNegotiator implements ProtocolNegotiator {
final ProtocolNegotiator delegate = InternalProtocolNegotiators.plaintext();
@Override
public AsciiString scheme() {
return delegate.scheme();
}
@Override
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
CapturingProtocolNegotiationFactory.this.grpcHandlerRef.set(grpcHandler);
return delegate.newHandler(grpcHandler);
}
@Override
public void close() {
delegate.close();
}
}
}
} }

View File

@ -16,9 +16,12 @@
package io.grpc.netty; package io.grpc.netty;
import static com.google.common.base.Preconditions.checkArgument;
import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception; import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import io.grpc.netty.ListeningEncoder.Http2OutboundFrameListener;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.Http2ConnectionDecoder; import io.netty.handler.codec.http2.Http2ConnectionDecoder;
@ -35,10 +38,15 @@ import java.util.concurrent.TimeUnit;
*/ */
abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler { abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
private static final long GRACEFUL_SHUTDOWN_NO_TIMEOUT = -1; private static final long GRACEFUL_SHUTDOWN_NO_TIMEOUT = -1;
private boolean autoTuneFlowControlOn = false; private static final int MAX_ALLOWED_PING = 2;
private int initialConnectionWindow;
private final int initialConnectionWindow;
private final PingCountingListener pingCountingListener = new PingCountingListener();
private final FlowControlPinger flowControlPing = new FlowControlPinger(MAX_ALLOWED_PING);
private boolean autoTuneFlowControlOn;
private ChannelHandlerContext ctx; private ChannelHandlerContext ctx;
private final FlowControlPinger flowControlPing = new FlowControlPinger(); private boolean initialWindowSent = false;
private static final long BDP_MEASUREMENT_PING = 1234; private static final long BDP_MEASUREMENT_PING = 1234;
@ -46,7 +54,8 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
ChannelPromise channelUnused, ChannelPromise channelUnused,
Http2ConnectionDecoder decoder, Http2ConnectionDecoder decoder,
Http2ConnectionEncoder encoder, Http2ConnectionEncoder encoder,
Http2Settings initialSettings) { Http2Settings initialSettings,
boolean autoFlowControl) {
super(channelUnused, decoder, encoder, initialSettings); super(channelUnused, decoder, encoder, initialSettings);
// During a graceful shutdown, wait until all streams are closed. // During a graceful shutdown, wait until all streams are closed.
@ -55,6 +64,10 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
// Extract the connection window from the settings if it was set. // Extract the connection window from the settings if it was set.
this.initialConnectionWindow = initialSettings.initialWindowSize() == null ? -1 : this.initialConnectionWindow = initialSettings.initialWindowSize() == null ? -1 :
initialSettings.initialWindowSize(); initialSettings.initialWindowSize();
this.autoTuneFlowControlOn = autoFlowControl;
if (encoder instanceof ListeningEncoder) {
((ListeningEncoder) encoder).setListener(pingCountingListener);
}
} }
@Override @Override
@ -92,12 +105,12 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
* Sends initial connection window to the remote endpoint if necessary. * Sends initial connection window to the remote endpoint if necessary.
*/ */
private void sendInitialConnectionWindow() throws Http2Exception { private void sendInitialConnectionWindow() throws Http2Exception {
if (ctx.channel().isActive() && initialConnectionWindow > 0) { if (!initialWindowSent && ctx.channel().isActive()) {
Http2Stream connectionStream = connection().connectionStream(); Http2Stream connectionStream = connection().connectionStream();
int currentSize = connection().local().flowController().windowSize(connectionStream); int currentSize = connection().local().flowController().windowSize(connectionStream);
int delta = initialConnectionWindow - currentSize; int delta = initialConnectionWindow - currentSize;
decoder().flowController().incrementWindowSize(connectionStream, delta); decoder().flowController().incrementWindowSize(connectionStream, delta);
initialConnectionWindow = -1; initialWindowSent = true;
ctx.flush(); ctx.flush();
} }
} }
@ -118,6 +131,7 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
final class FlowControlPinger { final class FlowControlPinger {
private static final int MAX_WINDOW_SIZE = 8 * 1024 * 1024; private static final int MAX_WINDOW_SIZE = 8 * 1024 * 1024;
private final int maxAllowedPing;
private int pingCount; private int pingCount;
private int pingReturn; private int pingReturn;
private boolean pinging; private boolean pinging;
@ -125,6 +139,11 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
private float lastBandwidth; // bytes per second private float lastBandwidth; // bytes per second
private long lastPingTime; private long lastPingTime;
public FlowControlPinger(int maxAllowedPing) {
checkArgument(maxAllowedPing > 0, "maxAllowedPing must be positive");
this.maxAllowedPing = maxAllowedPing;
}
public long payload() { public long payload() {
return BDP_MEASUREMENT_PING; return BDP_MEASUREMENT_PING;
} }
@ -137,7 +156,7 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
if (!autoTuneFlowControlOn) { if (!autoTuneFlowControlOn) {
return; return;
} }
if (!isPinging()) { if (!isPinging() && pingCountingListener.pingCount < maxAllowedPing) {
setPinging(true); setPinging(true);
sendPing(ctx()); sendPing(ctx());
} }
@ -168,7 +187,6 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
settings.initialWindowSize(targetWindow); settings.initialWindowSize(targetWindow);
frameWriter().writeSettings(ctx(), settings, ctx().newPromise()); frameWriter().writeSettings(ctx(), settings, ctx().newPromise());
} }
} }
private boolean isPinging() { private boolean isPinging() {
@ -216,4 +234,28 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
lastPingTime = System.nanoTime() - TimeUnit.SECONDS.toNanos(1); lastPingTime = System.nanoTime() - TimeUnit.SECONDS.toNanos(1);
} }
} }
private static class PingCountingListener extends Http2OutboundFrameListener {
int pingCount = 0;
@Override
public void onWindowUpdate(int streamId, int windowSizeIncrement) {
pingCount = 0;
super.onWindowUpdate(streamId, windowSizeIncrement);
}
@Override
public void onPing(boolean ack, long data) {
if (!ack) {
pingCount++;
}
super.onPing(ack, data);
}
@Override
public void onData(int streamId, ByteBuf data, int padding, boolean endStream) {
pingCount = 0;
super.onData(streamId, data, padding, endStream);
}
}
} }

View File

@ -1,44 +0,0 @@
/*
* Copyright 2016 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.netty;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.Internal;
/**
* Controlled accessor to {@link NettyHandlerSettings}.
*/
@VisibleForTesting // Visible for tests in other packages.
@Internal
public final class InternalHandlerSettings {
public static void enable(boolean enable) {
NettyHandlerSettings.enable(enable);
}
public static synchronized void autoWindowOn(boolean autoFlowControl) {
NettyHandlerSettings.autoWindowOn(autoFlowControl);
}
public static synchronized int getLatestClientWindow() {
return NettyHandlerSettings.getLatestClientWindow();
}
public static synchronized int getLatestServerWindow() {
return NettyHandlerSettings.getLatestServerWindow();
}
}

View File

@ -0,0 +1,136 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.netty;
import static com.google.common.base.Preconditions.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.StreamBufferingEncoder;
/** A ListeningEncoder notifies {@link Http2OutboundFrameListener} on http2 outbound frame event. */
interface ListeningEncoder {
void setListener(Http2OutboundFrameListener listener);
/**
* Partial implementation of (Listening subset of event) event listener for outbound http2
* frames.
*/
class Http2OutboundFrameListener {
/** Notifies on outbound WINDOW_UPDATE frame. */
public void onWindowUpdate(int streamId, int windowSizeIncrement) {}
/** Notifies on outbound PING frame. */
public void onPing(boolean ack, long data) {}
/** Notifies on outbound DATA frame. */
public void onData(int streamId, ByteBuf data, int padding, boolean endStream) {}
}
/** A {@link StreamBufferingEncoder} notifies http2 outbound frame event. */
final class ListeningStreamBufferingEncoder
extends StreamBufferingEncoder implements ListeningEncoder {
private Http2OutboundFrameListener listener = new Http2OutboundFrameListener();
public ListeningStreamBufferingEncoder(Http2ConnectionEncoder encoder) {
super(encoder);
}
@Override
public void setListener(Http2OutboundFrameListener listener) {
this.listener = checkNotNull(listener, "listener");
}
@Override
public ChannelFuture writePing(
ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) {
listener.onPing(ack, data);
return super.writePing(ctx, ack, data, promise);
}
@Override
public ChannelFuture writeWindowUpdate(
ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) {
listener.onWindowUpdate(streamId, windowSizeIncrement);
return super.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise);
}
@Override
public ChannelFuture writeData(
ChannelHandlerContext ctx,
int streamId,
ByteBuf data,
int padding,
boolean eos,
ChannelPromise promise) {
listener.onData(streamId, data, padding, eos);
return super.writeData(ctx, streamId, data, padding, eos, promise);
}
}
/** A {@link DefaultHttp2ConnectionEncoder} notifies http2 outbound frame event. */
final class ListeningDefaultHttp2ConnectionEncoder
extends DefaultHttp2ConnectionEncoder implements ListeningEncoder {
private Http2OutboundFrameListener listener = new Http2OutboundFrameListener();
public ListeningDefaultHttp2ConnectionEncoder(
Http2Connection connection, Http2FrameWriter frameWriter) {
super(connection, frameWriter);
}
@Override
public void setListener(Http2OutboundFrameListener listener) {
this.listener = checkNotNull(listener, "listener");
}
@Override
public ChannelFuture writePing(
ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) {
listener.onPing(ack, data);
return super.writePing(ctx, ack, data, promise);
}
@Override
public ChannelFuture writeWindowUpdate(
ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) {
listener.onWindowUpdate(streamId, windowSizeIncrement);
return super.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise);
}
@Override
public ChannelFuture writeData(
ChannelHandlerContext ctx,
int streamId,
ByteBuf data,
int padding,
boolean eos,
ChannelPromise promise) {
listener.onData(streamId, data, padding, eos);
return super.writeData(ctx, streamId, data, padding, eos, promise);
}
}
}

View File

@ -84,6 +84,7 @@ public final class NettyChannelBuilder
private ChannelFactory<? extends Channel> channelFactory = DEFAULT_CHANNEL_FACTORY; private ChannelFactory<? extends Channel> channelFactory = DEFAULT_CHANNEL_FACTORY;
private ObjectPool<? extends EventLoopGroup> eventLoopGroupPool = DEFAULT_EVENT_LOOP_GROUP_POOL; private ObjectPool<? extends EventLoopGroup> eventLoopGroupPool = DEFAULT_EVENT_LOOP_GROUP_POOL;
private SslContext sslContext; private SslContext sslContext;
private boolean autoFlowControl;
private int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW; private int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW;
private int maxHeaderListSize = GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE; private int maxHeaderListSize = GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE;
private long keepAliveTimeNanos = KEEPALIVE_TIME_NANOS_DISABLED; private long keepAliveTimeNanos = KEEPALIVE_TIME_NANOS_DISABLED;
@ -247,12 +248,26 @@ public final class NettyChannelBuilder
} }
/** /**
* Sets the flow control window in bytes. If not called, the default value * Sets the initial flow control window in bytes. Setting initial flow control window enables auto
* is {@link #DEFAULT_FLOW_CONTROL_WINDOW}). * flow control tuning using bandwidth-delay product algorithm. To disable auto flow control
* tuning, use {@link #flowControlWindow(int)}.
*/
public NettyChannelBuilder initialFlowControlWindow(int initialFlowControlWindow) {
checkArgument(initialFlowControlWindow > 0, "initialFlowControlWindow must be positive");
this.flowControlWindow = initialFlowControlWindow;
this.autoFlowControl = true;
return this;
}
/**
* Sets the flow control window in bytes. Setting flowControlWindow disables auto flow control
* tuning; use {@link #initialFlowControlWindow(int)} to enable auto flow control tuning. If not
* called, the default value is {@link #DEFAULT_FLOW_CONTROL_WINDOW}).
*/ */
public NettyChannelBuilder flowControlWindow(int flowControlWindow) { public NettyChannelBuilder flowControlWindow(int flowControlWindow) {
checkArgument(flowControlWindow > 0, "flowControlWindow must be positive"); checkArgument(flowControlWindow > 0, "flowControlWindow must be positive");
this.flowControlWindow = flowControlWindow; this.flowControlWindow = flowControlWindow;
this.autoFlowControl = false;
return this; return this;
} }
@ -405,7 +420,7 @@ public final class NettyChannelBuilder
return new NettyTransportFactory( return new NettyTransportFactory(
negotiator, channelFactory, channelOptions, negotiator, channelFactory, channelOptions,
eventLoopGroupPool, flowControlWindow, maxInboundMessageSize(), eventLoopGroupPool, autoFlowControl, flowControlWindow, maxInboundMessageSize(),
maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls, maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls,
transportTracerFactory, localSocketPicker, useGetForSafeMethods); transportTracerFactory, localSocketPicker, useGetForSafeMethods);
} }
@ -521,6 +536,7 @@ public final class NettyChannelBuilder
private final Map<ChannelOption<?>, ?> channelOptions; private final Map<ChannelOption<?>, ?> channelOptions;
private final ObjectPool<? extends EventLoopGroup> groupPool; private final ObjectPool<? extends EventLoopGroup> groupPool;
private final EventLoopGroup group; private final EventLoopGroup group;
private final boolean autoFlowControl;
private final int flowControlWindow; private final int flowControlWindow;
private final int maxMessageSize; private final int maxMessageSize;
private final int maxHeaderListSize; private final int maxHeaderListSize;
@ -536,7 +552,7 @@ public final class NettyChannelBuilder
NettyTransportFactory(ProtocolNegotiator protocolNegotiator, NettyTransportFactory(ProtocolNegotiator protocolNegotiator,
ChannelFactory<? extends Channel> channelFactory, ChannelFactory<? extends Channel> channelFactory,
Map<ChannelOption<?>, ?> channelOptions, ObjectPool<? extends EventLoopGroup> groupPool, Map<ChannelOption<?>, ?> channelOptions, ObjectPool<? extends EventLoopGroup> groupPool,
int flowControlWindow, int maxMessageSize, int maxHeaderListSize, boolean autoFlowControl, int flowControlWindow, int maxMessageSize, int maxHeaderListSize,
long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls, long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls,
TransportTracer.Factory transportTracerFactory, LocalSocketPicker localSocketPicker, TransportTracer.Factory transportTracerFactory, LocalSocketPicker localSocketPicker,
boolean useGetForSafeMethods) { boolean useGetForSafeMethods) {
@ -545,6 +561,7 @@ public final class NettyChannelBuilder
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions); this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
this.groupPool = groupPool; this.groupPool = groupPool;
this.group = groupPool.getObject(); this.group = groupPool.getObject();
this.autoFlowControl = autoFlowControl;
this.flowControlWindow = flowControlWindow; this.flowControlWindow = flowControlWindow;
this.maxMessageSize = maxMessageSize; this.maxMessageSize = maxMessageSize;
this.maxHeaderListSize = maxHeaderListSize; this.maxHeaderListSize = maxHeaderListSize;
@ -584,7 +601,7 @@ public final class NettyChannelBuilder
// TODO(carl-mastrangelo): Pass channelLogger in. // TODO(carl-mastrangelo): Pass channelLogger in.
NettyClientTransport transport = new NettyClientTransport( NettyClientTransport transport = new NettyClientTransport(
serverAddress, channelFactory, channelOptions, group, serverAddress, channelFactory, channelOptions, group,
localNegotiator, flowControlWindow, localNegotiator, autoFlowControl, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos, maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos,
keepAliveWithoutCalls, options.getAuthority(), options.getUserAgent(), keepAliveWithoutCalls, options.getAuthority(), options.getUserAgent(),
tooManyPingsRunnable, transportTracerFactory.create(), options.getEagAttributes(), tooManyPingsRunnable, transportTracerFactory.create(), options.getEagAttributes(),

View File

@ -38,6 +38,7 @@ import io.grpc.internal.InUseStateAggregator;
import io.grpc.internal.KeepAliveManager; import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.TransportTracer; import io.grpc.internal.TransportTracer;
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder; import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder;
import io.grpc.netty.ListeningEncoder.ListeningStreamBufferingEncoder;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil; import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
@ -57,6 +58,7 @@ import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.codec.http2.Http2Connection; import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionAdapter; import io.netty.handler.codec.http2.Http2ConnectionAdapter;
import io.netty.handler.codec.http2.Http2ConnectionDecoder; import io.netty.handler.codec.http2.Http2ConnectionDecoder;
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2Error; import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception; import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2FlowController; import io.netty.handler.codec.http2.Http2FlowController;
@ -131,6 +133,7 @@ class NettyClientHandler extends AbstractNettyHandler {
static NettyClientHandler newHandler( static NettyClientHandler newHandler(
ClientTransportLifecycleManager lifecycleManager, ClientTransportLifecycleManager lifecycleManager,
@Nullable KeepAliveManager keepAliveManager, @Nullable KeepAliveManager keepAliveManager,
boolean autoFlowControl,
int flowControlWindow, int flowControlWindow,
int maxHeaderListSize, int maxHeaderListSize,
Supplier<Stopwatch> stopwatchFactory, Supplier<Stopwatch> stopwatchFactory,
@ -155,6 +158,7 @@ class NettyClientHandler extends AbstractNettyHandler {
frameWriter, frameWriter,
lifecycleManager, lifecycleManager,
keepAliveManager, keepAliveManager,
autoFlowControl,
flowControlWindow, flowControlWindow,
maxHeaderListSize, maxHeaderListSize,
stopwatchFactory, stopwatchFactory,
@ -171,6 +175,7 @@ class NettyClientHandler extends AbstractNettyHandler {
Http2FrameWriter frameWriter, Http2FrameWriter frameWriter,
ClientTransportLifecycleManager lifecycleManager, ClientTransportLifecycleManager lifecycleManager,
KeepAliveManager keepAliveManager, KeepAliveManager keepAliveManager,
boolean autoFlowControl,
int flowControlWindow, int flowControlWindow,
int maxHeaderListSize, int maxHeaderListSize,
Supplier<Stopwatch> stopwatchFactory, Supplier<Stopwatch> stopwatchFactory,
@ -192,8 +197,9 @@ class NettyClientHandler extends AbstractNettyHandler {
frameReader = new Http2InboundFrameLogger(frameReader, frameLogger); frameReader = new Http2InboundFrameLogger(frameReader, frameLogger);
frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger); frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger);
StreamBufferingEncoder encoder = new StreamBufferingEncoder( StreamBufferingEncoder encoder =
new DefaultHttp2ConnectionEncoder(connection, frameWriter)); new ListeningStreamBufferingEncoder(
new DefaultHttp2ConnectionEncoder(connection, frameWriter));
// Create the local flow controller configured to auto-refill the connection window. // Create the local flow controller configured to auto-refill the connection window.
connection.local().flowController( connection.local().flowController(
@ -230,12 +236,13 @@ class NettyClientHandler extends AbstractNettyHandler {
tooManyPingsRunnable, tooManyPingsRunnable,
transportTracer, transportTracer,
eagAttributes, eagAttributes,
authority); authority,
autoFlowControl);
} }
private NettyClientHandler( private NettyClientHandler(
Http2ConnectionDecoder decoder, Http2ConnectionDecoder decoder,
StreamBufferingEncoder encoder, Http2ConnectionEncoder encoder,
Http2Settings settings, Http2Settings settings,
ClientTransportLifecycleManager lifecycleManager, ClientTransportLifecycleManager lifecycleManager,
KeepAliveManager keepAliveManager, KeepAliveManager keepAliveManager,
@ -243,8 +250,9 @@ class NettyClientHandler extends AbstractNettyHandler {
final Runnable tooManyPingsRunnable, final Runnable tooManyPingsRunnable,
TransportTracer transportTracer, TransportTracer transportTracer,
Attributes eagAttributes, Attributes eagAttributes,
String authority) { String authority,
super(/* channelUnused= */ null, decoder, encoder, settings); boolean autoFlowControl) {
super(/* channelUnused= */ null, decoder, encoder, settings, autoFlowControl);
this.lifecycleManager = lifecycleManager; this.lifecycleManager = lifecycleManager;
this.keepAliveManager = keepAliveManager; this.keepAliveManager = keepAliveManager;
this.stopwatchFactory = stopwatchFactory; this.stopwatchFactory = stopwatchFactory;

View File

@ -79,6 +79,7 @@ class NettyClientTransport implements ConnectionClientTransport {
private final String authorityString; private final String authorityString;
private final AsciiString authority; private final AsciiString authority;
private final AsciiString userAgent; private final AsciiString userAgent;
private final boolean autoFlowControl;
private final int flowControlWindow; private final int flowControlWindow;
private final int maxMessageSize; private final int maxMessageSize;
private final int maxHeaderListSize; private final int maxHeaderListSize;
@ -106,8 +107,9 @@ class NettyClientTransport implements ConnectionClientTransport {
NettyClientTransport( NettyClientTransport(
SocketAddress address, ChannelFactory<? extends Channel> channelFactory, SocketAddress address, ChannelFactory<? extends Channel> channelFactory,
Map<ChannelOption<?>, ?> channelOptions, EventLoopGroup group, Map<ChannelOption<?>, ?> channelOptions, EventLoopGroup group,
ProtocolNegotiator negotiator, int flowControlWindow, int maxMessageSize, ProtocolNegotiator negotiator, boolean autoFlowControl, int flowControlWindow,
int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos, int maxMessageSize, int maxHeaderListSize,
long keepAliveTimeNanos, long keepAliveTimeoutNanos,
boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent, boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent,
Runnable tooManyPingsRunnable, TransportTracer transportTracer, Attributes eagAttributes, Runnable tooManyPingsRunnable, TransportTracer transportTracer, Attributes eagAttributes,
LocalSocketPicker localSocketPicker, ChannelLogger channelLogger, LocalSocketPicker localSocketPicker, ChannelLogger channelLogger,
@ -118,6 +120,7 @@ class NettyClientTransport implements ConnectionClientTransport {
this.group = Preconditions.checkNotNull(group, "group"); this.group = Preconditions.checkNotNull(group, "group");
this.channelFactory = channelFactory; this.channelFactory = channelFactory;
this.channelOptions = Preconditions.checkNotNull(channelOptions, "channelOptions"); this.channelOptions = Preconditions.checkNotNull(channelOptions, "channelOptions");
this.autoFlowControl = autoFlowControl;
this.flowControlWindow = flowControlWindow; this.flowControlWindow = flowControlWindow;
this.maxMessageSize = maxMessageSize; this.maxMessageSize = maxMessageSize;
this.maxHeaderListSize = maxHeaderListSize; this.maxHeaderListSize = maxHeaderListSize;
@ -214,6 +217,7 @@ class NettyClientTransport implements ConnectionClientTransport {
handler = NettyClientHandler.newHandler( handler = NettyClientHandler.newHandler(
lifecycleManager, lifecycleManager,
keepAliveManager, keepAliveManager,
autoFlowControl,
flowControlWindow, flowControlWindow,
maxHeaderListSize, maxHeaderListSize,
GrpcUtil.STOPWATCH_SUPPLIER, GrpcUtil.STOPWATCH_SUPPLIER,
@ -221,7 +225,6 @@ class NettyClientTransport implements ConnectionClientTransport {
transportTracer, transportTracer,
eagAttributes, eagAttributes,
authorityString); authorityString);
NettyHandlerSettings.setAutoWindow(handler);
ChannelHandler negotiationHandler = negotiator.newHandler(handler); ChannelHandler negotiationHandler = negotiator.newHandler(handler);

View File

@ -1,72 +0,0 @@
/*
* Copyright 2016 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.netty;
import com.google.common.base.Preconditions;
/**
* Allows autoFlowControl to be turned on and off from interop testing and flow control windows to
* be accessed.
*/
final class NettyHandlerSettings {
private static volatile boolean enabled;
private static boolean autoFlowControlOn;
// These will be the most recently created handlers created using NettyClientTransport and
// NettyServerTransport
private static AbstractNettyHandler clientHandler;
private static AbstractNettyHandler serverHandler;
static void setAutoWindow(AbstractNettyHandler handler) {
if (!enabled) {
return;
}
synchronized (NettyHandlerSettings.class) {
handler.setAutoTuneFlowControl(autoFlowControlOn);
if (handler instanceof NettyClientHandler) {
clientHandler = handler;
} else if (handler instanceof NettyServerHandler) {
serverHandler = handler;
} else {
throw new RuntimeException("Expecting NettyClientHandler or NettyServerHandler");
}
}
}
public static void enable(boolean enable) {
enabled = enable;
}
public static synchronized void autoWindowOn(boolean autoFlowControl) {
autoFlowControlOn = autoFlowControl;
}
public static synchronized int getLatestClientWindow() {
return getLatestWindow(clientHandler);
}
public static synchronized int getLatestServerWindow() {
return getLatestWindow(serverHandler);
}
private static synchronized int getLatestWindow(AbstractNettyHandler handler) {
Preconditions.checkNotNull(handler);
return handler.decoder().flowController()
.initialWindowSize(handler.connection().connectionStream());
}
}

View File

@ -78,6 +78,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
private EventLoopGroup workerGroup; private EventLoopGroup workerGroup;
private ServerListener listener; private ServerListener listener;
private Channel channel; private Channel channel;
private final boolean autoFlowControl;
private final int flowControlWindow; private final int flowControlWindow;
private final int maxMessageSize; private final int maxMessageSize;
private final int maxHeaderListSize; private final int maxHeaderListSize;
@ -106,7 +107,8 @@ class NettyServer implements InternalServer, InternalWithLogId {
ProtocolNegotiator protocolNegotiator, ProtocolNegotiator protocolNegotiator,
List<? extends ServerStreamTracer.Factory> streamTracerFactories, List<? extends ServerStreamTracer.Factory> streamTracerFactories,
TransportTracer.Factory transportTracerFactory, TransportTracer.Factory transportTracerFactory,
int maxStreamsPerConnection, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, int maxStreamsPerConnection, boolean autoFlowControl, int flowControlWindow,
int maxMessageSize, int maxHeaderListSize,
long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos,
long maxConnectionIdleInNanos, long maxConnectionIdleInNanos,
long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos,
@ -127,6 +129,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories"); this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
this.transportTracerFactory = transportTracerFactory; this.transportTracerFactory = transportTracerFactory;
this.maxStreamsPerConnection = maxStreamsPerConnection; this.maxStreamsPerConnection = maxStreamsPerConnection;
this.autoFlowControl = autoFlowControl;
this.flowControlWindow = flowControlWindow; this.flowControlWindow = flowControlWindow;
this.maxMessageSize = maxMessageSize; this.maxMessageSize = maxMessageSize;
this.maxHeaderListSize = maxHeaderListSize; this.maxHeaderListSize = maxHeaderListSize;
@ -205,6 +208,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
streamTracerFactories, streamTracerFactories,
transportTracerFactory.create(), transportTracerFactory.create(),
maxStreamsPerConnection, maxStreamsPerConnection,
autoFlowControl,
flowControlWindow, flowControlWindow,
maxMessageSize, maxMessageSize,
maxHeaderListSize, maxHeaderListSize,

View File

@ -94,6 +94,7 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
private SslContext sslContext; private SslContext sslContext;
private ProtocolNegotiator protocolNegotiator; private ProtocolNegotiator protocolNegotiator;
private int maxConcurrentCallsPerConnection = Integer.MAX_VALUE; private int maxConcurrentCallsPerConnection = Integer.MAX_VALUE;
private boolean autoFlowControl = false;
private int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW; private int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW;
private int maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE; private int maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
private int maxHeaderListSize = GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE; private int maxHeaderListSize = GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE;
@ -346,13 +347,27 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
} }
/** /**
* Sets the HTTP/2 flow control window. If not called, the default value * Sets the initial flow control window in bytes. Setting initial flow control window enables auto
* is {@link #DEFAULT_FLOW_CONTROL_WINDOW}). * flow control tuning using bandwidth-delay product algorithm. To disable auto flow control
* tuning, use {@link #flowControlWindow(int)}.
*/
public NettyServerBuilder initialFlowControlWindow(int initialFlowControlWindow) {
checkArgument(initialFlowControlWindow > 0, "initialFlowControlWindow must be positive");
this.flowControlWindow = initialFlowControlWindow;
this.autoFlowControl = true;
return this;
}
/**
* Sets the flow control window in bytes. Setting flowControlWindow disables auto flow control
* tuning; use {@link #initialFlowControlWindow(int)} to enable auto flow control tuning. If not
* called, the default value is {@link #DEFAULT_FLOW_CONTROL_WINDOW}).
*/ */
public NettyServerBuilder flowControlWindow(int flowControlWindow) { public NettyServerBuilder flowControlWindow(int flowControlWindow) {
checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s", checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s",
flowControlWindow); flowControlWindow);
this.flowControlWindow = flowControlWindow; this.flowControlWindow = flowControlWindow;
this.autoFlowControl = false;
return this; return this;
} }
@ -564,8 +579,9 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
listenAddress, channelFactory, channelOptions, childChannelOptions, listenAddress, channelFactory, channelOptions, childChannelOptions,
bossEventLoopGroupPool, workerEventLoopGroupPool, forceHeapBuffer, negotiator, bossEventLoopGroupPool, workerEventLoopGroupPool, forceHeapBuffer, negotiator,
streamTracerFactories, getTransportTracerFactory(), maxConcurrentCallsPerConnection, streamTracerFactories, getTransportTracerFactory(), maxConcurrentCallsPerConnection,
flowControlWindow, maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, autoFlowControl, flowControlWindow, maxMessageSize, maxHeaderListSize,
keepAliveTimeoutInNanos, maxConnectionIdleInNanos, maxConnectionAgeInNanos, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionIdleInNanos, maxConnectionAgeInNanos,
maxConnectionAgeGraceInNanos, permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, maxConnectionAgeGraceInNanos, permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos,
getChannelz()); getChannelz());
transportServers.add(transportServer); transportServers.add(transportServer);

View File

@ -45,6 +45,7 @@ import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.StatsTraceContext; import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.TransportTracer; import io.grpc.internal.TransportTracer;
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder; import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder;
import io.grpc.netty.ListeningEncoder.ListeningDefaultHttp2ConnectionEncoder;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil; import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -54,7 +55,6 @@ import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter; import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2Connection; import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder; import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader; import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2Headers; import io.netty.handler.codec.http2.DefaultHttp2Headers;
@ -139,6 +139,7 @@ class NettyServerHandler extends AbstractNettyHandler {
List<? extends ServerStreamTracer.Factory> streamTracerFactories, List<? extends ServerStreamTracer.Factory> streamTracerFactories,
TransportTracer transportTracer, TransportTracer transportTracer,
int maxStreams, int maxStreams,
boolean autoFlowControl,
int flowControlWindow, int flowControlWindow,
int maxHeaderListSize, int maxHeaderListSize,
int maxMessageSize, int maxMessageSize,
@ -165,6 +166,7 @@ class NettyServerHandler extends AbstractNettyHandler {
streamTracerFactories, streamTracerFactories,
transportTracer, transportTracer,
maxStreams, maxStreams,
autoFlowControl,
flowControlWindow, flowControlWindow,
maxHeaderListSize, maxHeaderListSize,
maxMessageSize, maxMessageSize,
@ -186,6 +188,7 @@ class NettyServerHandler extends AbstractNettyHandler {
List<? extends ServerStreamTracer.Factory> streamTracerFactories, List<? extends ServerStreamTracer.Factory> streamTracerFactories,
TransportTracer transportTracer, TransportTracer transportTracer,
int maxStreams, int maxStreams,
boolean autoFlowControl,
int flowControlWindow, int flowControlWindow,
int maxHeaderListSize, int maxHeaderListSize,
int maxMessageSize, int maxMessageSize,
@ -217,7 +220,8 @@ class NettyServerHandler extends AbstractNettyHandler {
connection.local().flowController( connection.local().flowController(
new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true)); new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer); frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer);
Http2ConnectionEncoder encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter); Http2ConnectionEncoder encoder =
new ListeningDefaultHttp2ConnectionEncoder(connection, frameWriter);
encoder = new Http2ControlFrameLimitEncoder(encoder, 10000); encoder = new Http2ControlFrameLimitEncoder(encoder, 10000);
Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
frameReader); frameReader);
@ -238,7 +242,8 @@ class NettyServerHandler extends AbstractNettyHandler {
keepAliveTimeInNanos, keepAliveTimeoutInNanos, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionIdleInNanos, maxConnectionIdleInNanos,
maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
keepAliveEnforcer); keepAliveEnforcer,
autoFlowControl);
} }
private NettyServerHandler( private NettyServerHandler(
@ -256,8 +261,9 @@ class NettyServerHandler extends AbstractNettyHandler {
long maxConnectionIdleInNanos, long maxConnectionIdleInNanos,
long maxConnectionAgeInNanos, long maxConnectionAgeInNanos,
long maxConnectionAgeGraceInNanos, long maxConnectionAgeGraceInNanos,
final KeepAliveEnforcer keepAliveEnforcer) { final KeepAliveEnforcer keepAliveEnforcer,
super(channelUnused, decoder, encoder, settings); boolean autoFlowControl) {
super(channelUnused, decoder, encoder, settings, autoFlowControl);
final MaxConnectionIdleManager maxConnectionIdleManager; final MaxConnectionIdleManager maxConnectionIdleManager;
if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) { if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) {

View File

@ -64,6 +64,7 @@ class NettyServerTransport implements ServerTransport {
private NettyServerHandler grpcHandler; private NettyServerHandler grpcHandler;
private ServerTransportListener listener; private ServerTransportListener listener;
private boolean terminated; private boolean terminated;
private final boolean autoFlowControl;
private final int flowControlWindow; private final int flowControlWindow;
private final int maxMessageSize; private final int maxMessageSize;
private final int maxHeaderListSize; private final int maxHeaderListSize;
@ -84,6 +85,7 @@ class NettyServerTransport implements ServerTransport {
List<? extends ServerStreamTracer.Factory> streamTracerFactories, List<? extends ServerStreamTracer.Factory> streamTracerFactories,
TransportTracer transportTracer, TransportTracer transportTracer,
int maxStreams, int maxStreams,
boolean autoFlowControl,
int flowControlWindow, int flowControlWindow,
int maxMessageSize, int maxMessageSize,
int maxHeaderListSize, int maxHeaderListSize,
@ -101,6 +103,7 @@ class NettyServerTransport implements ServerTransport {
Preconditions.checkNotNull(streamTracerFactories, "streamTracerFactories"); Preconditions.checkNotNull(streamTracerFactories, "streamTracerFactories");
this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer"); this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
this.maxStreams = maxStreams; this.maxStreams = maxStreams;
this.autoFlowControl = autoFlowControl;
this.flowControlWindow = flowControlWindow; this.flowControlWindow = flowControlWindow;
this.maxMessageSize = maxMessageSize; this.maxMessageSize = maxMessageSize;
this.maxHeaderListSize = maxHeaderListSize; this.maxHeaderListSize = maxHeaderListSize;
@ -121,7 +124,6 @@ class NettyServerTransport implements ServerTransport {
// Create the Netty handler for the pipeline. // Create the Netty handler for the pipeline.
grpcHandler = createHandler(listener, channelUnused); grpcHandler = createHandler(listener, channelUnused);
NettyHandlerSettings.setAutoWindow(grpcHandler);
// Notify when the channel closes. // Notify when the channel closes.
final class TerminationNotifier implements ChannelFutureListener { final class TerminationNotifier implements ChannelFutureListener {
@ -258,6 +260,7 @@ class NettyServerTransport implements ServerTransport {
streamTracerFactories, streamTracerFactories,
transportTracer, transportTracer,
maxStreams, maxStreams,
autoFlowControl,
flowControlWindow, flowControlWindow,
maxHeaderListSize, maxHeaderListSize,
maxMessageSize, maxMessageSize,

View File

@ -790,6 +790,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
frameWriter(), frameWriter(),
lifecycleManager, lifecycleManager,
mockKeepAliveManager, mockKeepAliveManager,
false,
flowControlWindow, flowControlWindow,
maxHeaderListSize, maxHeaderListSize,
stopwatchSupplier, stopwatchSupplier,

View File

@ -191,7 +191,7 @@ public class NettyClientTransportTest {
channelOptions.put(ChannelOption.SO_LINGER, soLinger); channelOptions.put(ChannelOption.SO_LINGER, soLinger);
NettyClientTransport transport = new NettyClientTransport( NettyClientTransport transport = new NettyClientTransport(
address, new ReflectiveChannelFactory<>(NioSocketChannel.class), channelOptions, group, address, new ReflectiveChannelFactory<>(NioSocketChannel.class), channelOptions, group,
newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, newNegotiator(), false, DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority,
null /* user agent */, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, null /* user agent */, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY,
new SocketPicker(), new FakeChannelLogger(), false); new SocketPicker(), new FakeChannelLogger(), false);
@ -437,7 +437,7 @@ public class NettyClientTransportTest {
NettyClientTransport transport = new NettyClientTransport( NettyClientTransport transport = new NettyClientTransport(
address, new ReflectiveChannelFactory<>(CantConstructChannel.class), address, new ReflectiveChannelFactory<>(CantConstructChannel.class),
new HashMap<ChannelOption<?>, Object>(), group, new HashMap<ChannelOption<?>, Object>(), group,
newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, newNegotiator(), false, DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority,
null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker(), null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker(),
new FakeChannelLogger(), false); new FakeChannelLogger(), false);
@ -752,7 +752,7 @@ public class NettyClientTransportTest {
} }
NettyClientTransport transport = new NettyClientTransport( NettyClientTransport transport = new NettyClientTransport(
address, channelFactory, new HashMap<ChannelOption<?>, Object>(), group, address, channelFactory, new HashMap<ChannelOption<?>, Object>(), group,
negotiator, DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize, negotiator, false, DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize,
keepAliveTimeNano, keepAliveTimeoutNano, keepAliveTimeNano, keepAliveTimeoutNano,
false, authority, userAgent, tooManyPingsRunnable, false, authority, userAgent, tooManyPingsRunnable,
new TransportTracer(), eagAttributes, new SocketPicker(), new FakeChannelLogger(), false); new TransportTracer(), eagAttributes, new SocketPicker(), new FakeChannelLogger(), false);
@ -774,6 +774,7 @@ public class NettyClientTransportTest {
Collections.<ServerStreamTracer.Factory>emptyList(), Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(), TransportTracer.getDefaultFactory(),
maxStreamsPerConnection, maxStreamsPerConnection,
false,
DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, maxHeaderListSize, DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, maxHeaderListSize,
DEFAULT_SERVER_KEEPALIVE_TIME_NANOS, DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS, DEFAULT_SERVER_KEEPALIVE_TIME_NANOS, DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS,
MAX_CONNECTION_IDLE_NANOS_DISABLED, MAX_CONNECTION_IDLE_NANOS_DISABLED,

View File

@ -101,6 +101,7 @@ public abstract class NettyHandlerTestBase<T extends Http2ConnectionHandler> {
protected final TransportTracer transportTracer = new TransportTracer(); protected final TransportTracer transportTracer = new TransportTracer();
protected int flowControlWindow = DEFAULT_WINDOW_SIZE; protected int flowControlWindow = DEFAULT_WINDOW_SIZE;
protected boolean autoFlowControl = false;
private final FakeClock fakeClock = new FakeClock(); private final FakeClock fakeClock = new FakeClock();

View File

@ -1116,6 +1116,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
Arrays.asList(streamTracerFactory), Arrays.asList(streamTracerFactory),
transportTracer, transportTracer,
maxConcurrentStreams, maxConcurrentStreams,
autoFlowControl,
flowControlWindow, flowControlWindow,
maxHeaderListSize, maxHeaderListSize,
DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_MAX_MESSAGE_SIZE,

View File

@ -100,6 +100,7 @@ public class NettyServerTest {
Collections.<ServerStreamTracer.Factory>emptyList(), Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(), TransportTracer.getDefaultFactory(),
1, // ignore 1, // ignore
false, // ignore
1, // ignore 1, // ignore
1, // ignore 1, // ignore
1, // ignore 1, // ignore
@ -146,6 +147,7 @@ public class NettyServerTest {
Collections.<ServerStreamTracer.Factory>emptyList(), Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(), TransportTracer.getDefaultFactory(),
1, // ignore 1, // ignore
false, // ignore
1, // ignore 1, // ignore
1, // ignore 1, // ignore
1, // ignore 1, // ignore
@ -186,6 +188,7 @@ public class NettyServerTest {
Collections.<ServerStreamTracer.Factory>emptyList(), Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(), TransportTracer.getDefaultFactory(),
1, // ignore 1, // ignore
false, // ignore
1, // ignore 1, // ignore
1, // ignore 1, // ignore
1, // ignore 1, // ignore
@ -238,6 +241,7 @@ public class NettyServerTest {
Collections.<ServerStreamTracer.Factory>emptyList(), Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(), TransportTracer.getDefaultFactory(),
1, // ignore 1, // ignore
false, // ignore
1, // ignore 1, // ignore
1, // ignore 1, // ignore
1, // ignore 1, // ignore