netty: BDP ping accounting should occur after flow control

It's hoped that this resolves the "too_many_pings" issue some users are
seeing that is worked around by GRPC_EXPERIMENTAL_AUTOFLOWCONTROL=false.
This change also avoids resetting the ping count for empty data frames
(which shouldn't really happen with gRPC).

The previous code failed to reset the ping count on HEADERS and
WINDOW_UPDATE. The code _appeared_ to have callbacks for WINDOW_UPDATE,
but was layered above the Http2Connection so was never called. Thus,
this version is much more aggressive then the previous version while
also addressing the correctness issue.
This commit is contained in:
Eric Anderson 2020-09-21 15:52:50 -07:00 committed by Eric Anderson
parent 4c1bab9ed5
commit 00e2d717a2
6 changed files with 155 additions and 179 deletions

View File

@ -16,12 +16,10 @@
package io.grpc.netty;
import static com.google.common.base.Preconditions.checkArgument;
import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.netty.ListeningEncoder.Http2OutboundFrameListener;
import io.netty.buffer.ByteBuf;
import com.google.common.base.Preconditions;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
@ -38,11 +36,9 @@ import java.util.concurrent.TimeUnit;
*/
abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
private static final long GRACEFUL_SHUTDOWN_NO_TIMEOUT = -1;
private static final int MAX_ALLOWED_PING = 2;
private final int initialConnectionWindow;
private final PingCountingListener pingCountingListener = new PingCountingListener();
private final FlowControlPinger flowControlPing = new FlowControlPinger(MAX_ALLOWED_PING);
private final FlowControlPinger flowControlPing;
private boolean autoTuneFlowControlOn;
private ChannelHandlerContext ctx;
@ -55,7 +51,8 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
Http2ConnectionDecoder decoder,
Http2ConnectionEncoder encoder,
Http2Settings initialSettings,
boolean autoFlowControl) {
boolean autoFlowControl,
PingLimiter pingLimiter) {
super(channelUnused, decoder, encoder, initialSettings);
// During a graceful shutdown, wait until all streams are closed.
@ -65,9 +62,10 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
this.initialConnectionWindow = initialSettings.initialWindowSize() == null ? -1 :
initialSettings.initialWindowSize();
this.autoTuneFlowControlOn = autoFlowControl;
if (encoder instanceof ListeningEncoder) {
((ListeningEncoder) encoder).setListener(pingCountingListener);
if (pingLimiter == null) {
pingLimiter = new AllowPingLimiter();
}
this.flowControlPing = new FlowControlPinger(pingLimiter);
}
@Override
@ -131,7 +129,8 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
final class FlowControlPinger {
private static final int MAX_WINDOW_SIZE = 8 * 1024 * 1024;
private final int maxAllowedPing;
private final PingLimiter pingLimiter;
private int pingCount;
private int pingReturn;
private boolean pinging;
@ -139,9 +138,9 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
private float lastBandwidth; // bytes per second
private long lastPingTime;
public FlowControlPinger(int maxAllowedPing) {
checkArgument(maxAllowedPing > 0, "maxAllowedPing must be positive");
this.maxAllowedPing = maxAllowedPing;
public FlowControlPinger(PingLimiter pingLimiter) {
Preconditions.checkNotNull(pingLimiter, "pingLimiter");
this.pingLimiter = pingLimiter;
}
public long payload() {
@ -156,7 +155,7 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
if (!autoTuneFlowControlOn) {
return;
}
if (!isPinging() && pingCountingListener.pingCount < maxAllowedPing) {
if (!isPinging() && pingLimiter.isPingAllowed()) {
setPinging(true);
sendPing(ctx());
}
@ -235,27 +234,14 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
}
}
private static class PingCountingListener extends Http2OutboundFrameListener {
int pingCount = 0;
/** Controls whether PINGs like those for BDP are permitted to be sent at the current time. */
public interface PingLimiter {
public boolean isPingAllowed();
}
@Override
public void onWindowUpdate(int streamId, int windowSizeIncrement) {
pingCount = 0;
super.onWindowUpdate(streamId, windowSizeIncrement);
}
@Override
public void onPing(boolean ack, long data) {
if (!ack) {
pingCount++;
}
super.onPing(ack, data);
}
@Override
public void onData(int streamId, ByteBuf data, int padding, boolean endStream) {
pingCount = 0;
super.onData(streamId, data, padding, endStream);
private static final class AllowPingLimiter implements PingLimiter {
@Override public boolean isPingAllowed() {
return true;
}
}
}

View File

@ -1,136 +0,0 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.netty;
import static com.google.common.base.Preconditions.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.StreamBufferingEncoder;
/** A ListeningEncoder notifies {@link Http2OutboundFrameListener} on http2 outbound frame event. */
interface ListeningEncoder {
void setListener(Http2OutboundFrameListener listener);
/**
* Partial implementation of (Listening subset of event) event listener for outbound http2
* frames.
*/
class Http2OutboundFrameListener {
/** Notifies on outbound WINDOW_UPDATE frame. */
public void onWindowUpdate(int streamId, int windowSizeIncrement) {}
/** Notifies on outbound PING frame. */
public void onPing(boolean ack, long data) {}
/** Notifies on outbound DATA frame. */
public void onData(int streamId, ByteBuf data, int padding, boolean endStream) {}
}
/** A {@link StreamBufferingEncoder} notifies http2 outbound frame event. */
final class ListeningStreamBufferingEncoder
extends StreamBufferingEncoder implements ListeningEncoder {
private Http2OutboundFrameListener listener = new Http2OutboundFrameListener();
public ListeningStreamBufferingEncoder(Http2ConnectionEncoder encoder) {
super(encoder);
}
@Override
public void setListener(Http2OutboundFrameListener listener) {
this.listener = checkNotNull(listener, "listener");
}
@Override
public ChannelFuture writePing(
ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) {
listener.onPing(ack, data);
return super.writePing(ctx, ack, data, promise);
}
@Override
public ChannelFuture writeWindowUpdate(
ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) {
listener.onWindowUpdate(streamId, windowSizeIncrement);
return super.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise);
}
@Override
public ChannelFuture writeData(
ChannelHandlerContext ctx,
int streamId,
ByteBuf data,
int padding,
boolean eos,
ChannelPromise promise) {
listener.onData(streamId, data, padding, eos);
return super.writeData(ctx, streamId, data, padding, eos, promise);
}
}
/** A {@link DefaultHttp2ConnectionEncoder} notifies http2 outbound frame event. */
final class ListeningDefaultHttp2ConnectionEncoder
extends DefaultHttp2ConnectionEncoder implements ListeningEncoder {
private Http2OutboundFrameListener listener = new Http2OutboundFrameListener();
public ListeningDefaultHttp2ConnectionEncoder(
Http2Connection connection, Http2FrameWriter frameWriter) {
super(connection, frameWriter);
}
@Override
public void setListener(Http2OutboundFrameListener listener) {
this.listener = checkNotNull(listener, "listener");
}
@Override
public ChannelFuture writePing(
ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) {
listener.onPing(ack, data);
return super.writePing(ctx, ack, data, promise);
}
@Override
public ChannelFuture writeWindowUpdate(
ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) {
listener.onWindowUpdate(streamId, windowSizeIncrement);
return super.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise);
}
@Override
public ChannelFuture writeData(
ChannelHandlerContext ctx,
int streamId,
ByteBuf data,
int padding,
boolean eos,
ChannelPromise promise) {
listener.onData(streamId, data, padding, eos);
return super.writeData(ctx, streamId, data, padding, eos, promise);
}
}
}

View File

@ -38,7 +38,6 @@ import io.grpc.internal.InUseStateAggregator;
import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.TransportTracer;
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder;
import io.grpc.netty.ListeningEncoder.ListeningStreamBufferingEncoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
@ -47,6 +46,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
@ -197,8 +197,11 @@ class NettyClientHandler extends AbstractNettyHandler {
frameReader = new Http2InboundFrameLogger(frameReader, frameLogger);
frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger);
PingCountingFrameWriter pingCounter;
frameWriter = pingCounter = new PingCountingFrameWriter(frameWriter);
StreamBufferingEncoder encoder =
new ListeningStreamBufferingEncoder(
new StreamBufferingEncoder(
new DefaultHttp2ConnectionEncoder(connection, frameWriter));
// Create the local flow controller configured to auto-refill the connection window.
@ -237,7 +240,8 @@ class NettyClientHandler extends AbstractNettyHandler {
transportTracer,
eagAttributes,
authority,
autoFlowControl);
autoFlowControl,
pingCounter);
}
private NettyClientHandler(
@ -251,8 +255,9 @@ class NettyClientHandler extends AbstractNettyHandler {
TransportTracer transportTracer,
Attributes eagAttributes,
String authority,
boolean autoFlowControl) {
super(/* channelUnused= */ null, decoder, encoder, settings, autoFlowControl);
boolean autoFlowControl,
PingLimiter pingLimiter) {
super(/* channelUnused= */ null, decoder, encoder, settings, autoFlowControl, pingLimiter);
this.lifecycleManager = lifecycleManager;
this.keepAliveManager = keepAliveManager;
this.stopwatchFactory = stopwatchFactory;
@ -912,4 +917,63 @@ class NettyClientHandler extends AbstractNettyHandler {
}
}
}
private static class PingCountingFrameWriter extends DecoratingHttp2FrameWriter
implements AbstractNettyHandler.PingLimiter {
private int pingCount;
public PingCountingFrameWriter(Http2FrameWriter delegate) {
super(delegate);
}
@Override
public boolean isPingAllowed() {
// "3 strikes" may cause the server to complain, so we limit ourselves to 2 or below.
return pingCount < 2;
}
@Override
public ChannelFuture writeHeaders(
ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int padding, boolean endStream, ChannelPromise promise) {
pingCount = 0;
return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
}
@Override
public ChannelFuture writeHeaders(
ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int streamDependency, short weight, boolean exclusive,
int padding, boolean endStream, ChannelPromise promise) {
pingCount = 0;
return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
padding, endStream, promise);
}
@Override
public ChannelFuture writeWindowUpdate(
ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) {
pingCount = 0;
return super.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise);
}
@Override
public ChannelFuture writePing(
ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) {
if (!ack) {
pingCount++;
}
return super.writePing(ctx, ack, data, promise);
}
@Override
public ChannelFuture writeData(
ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endStream,
ChannelPromise promise) {
if (data.isReadable()) {
pingCount = 0;
}
return super.writeData(ctx, streamId, data, padding, endStream, promise);
}
}
}

View File

@ -45,7 +45,6 @@ import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.TransportTracer;
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder;
import io.grpc.netty.ListeningEncoder.ListeningDefaultHttp2ConnectionEncoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelFuture;
@ -55,6 +54,7 @@ import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
@ -221,7 +221,7 @@ class NettyServerHandler extends AbstractNettyHandler {
new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer);
Http2ConnectionEncoder encoder =
new ListeningDefaultHttp2ConnectionEncoder(connection, frameWriter);
new DefaultHttp2ConnectionEncoder(connection, frameWriter);
encoder = new Http2ControlFrameLimitEncoder(encoder, 10000);
Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
frameReader);
@ -263,7 +263,7 @@ class NettyServerHandler extends AbstractNettyHandler {
long maxConnectionAgeGraceInNanos,
final KeepAliveEnforcer keepAliveEnforcer,
boolean autoFlowControl) {
super(channelUnused, decoder, encoder, settings, autoFlowControl);
super(channelUnused, decoder, encoder, settings, autoFlowControl, null);
final MaxConnectionIdleManager maxConnectionIdleManager;
if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) {

View File

@ -713,6 +713,68 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
assertEquals(1, transportTracer.getStats().keepAlivesSent);
}
@Test
public void bdpPingAvoidsTooManyPingsOnSpecialServers() throws Exception {
// gRPC servers limit PINGs based on what they _send_. Some servers limit PINGs based on what is
// _received_.
createStream();
handler().setAutoTuneFlowControl(true);
Http2Headers headers = new DefaultHttp2Headers().status(STATUS_OK)
.set(CONTENT_TYPE_HEADER, CONTENT_TYPE_GRPC);
channelRead(headersFrame(3, headers));
channelRead(dataFrame(3, false, content()));
verifyWrite().writePing(eq(ctx()), eq(false), eq(1234L), any(ChannelPromise.class));
channelRead(pingFrame(true, 1234));
channelRead(dataFrame(3, false, content()));
verifyWrite(times(2)).writePing(eq(ctx()), eq(false), eq(1234L), any(ChannelPromise.class));
channelRead(pingFrame(true, 1234));
channelRead(dataFrame(3, false, content()));
// No ping was sent
verifyWrite(times(2)).writePing(eq(ctx()), eq(false), eq(1234L), any(ChannelPromise.class));
}
@Test
public void bdpPingAllowedAfterSendingData() throws Exception {
// gRPC servers limit PINGs based on what they _send_. Some servers limit PINGs based on what is
// _received_.
flowControlWindow = 64 * 1024;
manualSetUp();
createStream();
handler().setAutoTuneFlowControl(true);
ByteBuf content = Unpooled.buffer(64 * 1024 + 1024);
content.writerIndex(content.capacity());
ChannelFuture future
= enqueue(new SendGrpcFrameCommand(streamTransportState, content, false));
assertFalse(future.isDone()); // flow control limits send
Http2Headers headers = new DefaultHttp2Headers().status(STATUS_OK)
.set(CONTENT_TYPE_HEADER, CONTENT_TYPE_GRPC);
channelRead(headersFrame(3, headers));
channelRead(dataFrame(3, false, content()));
verifyWrite().writePing(eq(ctx()), eq(false), eq(1234L), any(ChannelPromise.class));
channelRead(pingFrame(true, 1234));
channelRead(dataFrame(3, false, content()));
verifyWrite(times(2)).writePing(eq(ctx()), eq(false), eq(1234L), any(ChannelPromise.class));
channelRead(pingFrame(true, 1234));
channelRead(dataFrame(3, false, content()));
// No ping was sent
verifyWrite(times(2)).writePing(eq(ctx()), eq(false), eq(1234L), any(ChannelPromise.class));
channelRead(windowUpdate(0, 2024));
channelRead(windowUpdate(3, 2024));
assertTrue(future.isDone());
assertTrue(future.isSuccess());
// But now one is sent
channelRead(dataFrame(3, false, content()));
verifyWrite(times(3)).writePing(eq(ctx()), eq(false), eq(1234L), any(ChannelPromise.class));
}
@Override
public void dataPingAckIsRecognized() throws Exception {
super.dataPingAckIsRecognized();

View File

@ -316,7 +316,7 @@ public abstract class NettyHandlerTestBase<T extends Http2ConnectionHandler> {
protected final ByteBuf windowUpdate(int streamId, int delta) {
ChannelHandlerContext ctx = newMockContext();
new DefaultHttp2FrameWriter().writeWindowUpdate(ctx, 0, delta, newPromise());
new DefaultHttp2FrameWriter().writeWindowUpdate(ctx, streamId, delta, newPromise());
return captureWrite(ctx);
}