From 00e2d717a2d0a8c898357a5894c0f614dda50cb6 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Mon, 21 Sep 2020 15:52:50 -0700 Subject: [PATCH] netty: BDP ping accounting should occur after flow control It's hoped that this resolves the "too_many_pings" issue some users are seeing that is worked around by GRPC_EXPERIMENTAL_AUTOFLOWCONTROL=false. This change also avoids resetting the ping count for empty data frames (which shouldn't really happen with gRPC). The previous code failed to reset the ping count on HEADERS and WINDOW_UPDATE. The code _appeared_ to have callbacks for WINDOW_UPDATE, but was layered above the Http2Connection so was never called. Thus, this version is much more aggressive then the previous version while also addressing the correctness issue. --- .../io/grpc/netty/AbstractNettyHandler.java | 54 +++---- .../java/io/grpc/netty/ListeningEncoder.java | 136 ------------------ .../io/grpc/netty/NettyClientHandler.java | 74 +++++++++- .../io/grpc/netty/NettyServerHandler.java | 6 +- .../io/grpc/netty/NettyClientHandlerTest.java | 62 ++++++++ .../io/grpc/netty/NettyHandlerTestBase.java | 2 +- 6 files changed, 155 insertions(+), 179 deletions(-) delete mode 100644 netty/src/main/java/io/grpc/netty/ListeningEncoder.java diff --git a/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java b/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java index 6fcd03fdc4..4ab88f9efc 100644 --- a/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java +++ b/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java @@ -16,12 +16,10 @@ package io.grpc.netty; -import static com.google.common.base.Preconditions.checkArgument; import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception; import com.google.common.annotations.VisibleForTesting; -import io.grpc.netty.ListeningEncoder.Http2OutboundFrameListener; -import io.netty.buffer.ByteBuf; +import com.google.common.base.Preconditions; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http2.Http2ConnectionDecoder; @@ -38,11 +36,9 @@ import java.util.concurrent.TimeUnit; */ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler { private static final long GRACEFUL_SHUTDOWN_NO_TIMEOUT = -1; - private static final int MAX_ALLOWED_PING = 2; private final int initialConnectionWindow; - private final PingCountingListener pingCountingListener = new PingCountingListener(); - private final FlowControlPinger flowControlPing = new FlowControlPinger(MAX_ALLOWED_PING); + private final FlowControlPinger flowControlPing; private boolean autoTuneFlowControlOn; private ChannelHandlerContext ctx; @@ -55,7 +51,8 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler { Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, Http2Settings initialSettings, - boolean autoFlowControl) { + boolean autoFlowControl, + PingLimiter pingLimiter) { super(channelUnused, decoder, encoder, initialSettings); // During a graceful shutdown, wait until all streams are closed. @@ -65,9 +62,10 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler { this.initialConnectionWindow = initialSettings.initialWindowSize() == null ? -1 : initialSettings.initialWindowSize(); this.autoTuneFlowControlOn = autoFlowControl; - if (encoder instanceof ListeningEncoder) { - ((ListeningEncoder) encoder).setListener(pingCountingListener); + if (pingLimiter == null) { + pingLimiter = new AllowPingLimiter(); } + this.flowControlPing = new FlowControlPinger(pingLimiter); } @Override @@ -131,7 +129,8 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler { final class FlowControlPinger { private static final int MAX_WINDOW_SIZE = 8 * 1024 * 1024; - private final int maxAllowedPing; + + private final PingLimiter pingLimiter; private int pingCount; private int pingReturn; private boolean pinging; @@ -139,9 +138,9 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler { private float lastBandwidth; // bytes per second private long lastPingTime; - public FlowControlPinger(int maxAllowedPing) { - checkArgument(maxAllowedPing > 0, "maxAllowedPing must be positive"); - this.maxAllowedPing = maxAllowedPing; + public FlowControlPinger(PingLimiter pingLimiter) { + Preconditions.checkNotNull(pingLimiter, "pingLimiter"); + this.pingLimiter = pingLimiter; } public long payload() { @@ -156,7 +155,7 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler { if (!autoTuneFlowControlOn) { return; } - if (!isPinging() && pingCountingListener.pingCount < maxAllowedPing) { + if (!isPinging() && pingLimiter.isPingAllowed()) { setPinging(true); sendPing(ctx()); } @@ -235,27 +234,14 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler { } } - private static class PingCountingListener extends Http2OutboundFrameListener { - int pingCount = 0; + /** Controls whether PINGs like those for BDP are permitted to be sent at the current time. */ + public interface PingLimiter { + public boolean isPingAllowed(); + } - @Override - public void onWindowUpdate(int streamId, int windowSizeIncrement) { - pingCount = 0; - super.onWindowUpdate(streamId, windowSizeIncrement); - } - - @Override - public void onPing(boolean ack, long data) { - if (!ack) { - pingCount++; - } - super.onPing(ack, data); - } - - @Override - public void onData(int streamId, ByteBuf data, int padding, boolean endStream) { - pingCount = 0; - super.onData(streamId, data, padding, endStream); + private static final class AllowPingLimiter implements PingLimiter { + @Override public boolean isPingAllowed() { + return true; } } } diff --git a/netty/src/main/java/io/grpc/netty/ListeningEncoder.java b/netty/src/main/java/io/grpc/netty/ListeningEncoder.java deleted file mode 100644 index 03270d399a..0000000000 --- a/netty/src/main/java/io/grpc/netty/ListeningEncoder.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Copyright 2020 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.netty; - -import static com.google.common.base.Preconditions.checkNotNull; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; -import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder; -import io.netty.handler.codec.http2.Http2Connection; -import io.netty.handler.codec.http2.Http2ConnectionEncoder; -import io.netty.handler.codec.http2.Http2FrameWriter; -import io.netty.handler.codec.http2.StreamBufferingEncoder; - -/** A ListeningEncoder notifies {@link Http2OutboundFrameListener} on http2 outbound frame event. */ -interface ListeningEncoder { - - void setListener(Http2OutboundFrameListener listener); - - /** - * Partial implementation of (Listening subset of event) event listener for outbound http2 - * frames. - */ - class Http2OutboundFrameListener { - - /** Notifies on outbound WINDOW_UPDATE frame. */ - public void onWindowUpdate(int streamId, int windowSizeIncrement) {} - - /** Notifies on outbound PING frame. */ - public void onPing(boolean ack, long data) {} - - /** Notifies on outbound DATA frame. */ - public void onData(int streamId, ByteBuf data, int padding, boolean endStream) {} - } - - /** A {@link StreamBufferingEncoder} notifies http2 outbound frame event. */ - final class ListeningStreamBufferingEncoder - extends StreamBufferingEncoder implements ListeningEncoder { - - private Http2OutboundFrameListener listener = new Http2OutboundFrameListener(); - - public ListeningStreamBufferingEncoder(Http2ConnectionEncoder encoder) { - super(encoder); - } - - @Override - public void setListener(Http2OutboundFrameListener listener) { - this.listener = checkNotNull(listener, "listener"); - } - - @Override - public ChannelFuture writePing( - ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) { - listener.onPing(ack, data); - return super.writePing(ctx, ack, data, promise); - } - - @Override - public ChannelFuture writeWindowUpdate( - ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) { - listener.onWindowUpdate(streamId, windowSizeIncrement); - return super.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise); - } - - @Override - public ChannelFuture writeData( - ChannelHandlerContext ctx, - int streamId, - ByteBuf data, - int padding, - boolean eos, - ChannelPromise promise) { - listener.onData(streamId, data, padding, eos); - return super.writeData(ctx, streamId, data, padding, eos, promise); - } - } - - /** A {@link DefaultHttp2ConnectionEncoder} notifies http2 outbound frame event. */ - final class ListeningDefaultHttp2ConnectionEncoder - extends DefaultHttp2ConnectionEncoder implements ListeningEncoder { - - private Http2OutboundFrameListener listener = new Http2OutboundFrameListener(); - - public ListeningDefaultHttp2ConnectionEncoder( - Http2Connection connection, Http2FrameWriter frameWriter) { - super(connection, frameWriter); - } - - @Override - public void setListener(Http2OutboundFrameListener listener) { - this.listener = checkNotNull(listener, "listener"); - } - - @Override - public ChannelFuture writePing( - ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) { - listener.onPing(ack, data); - return super.writePing(ctx, ack, data, promise); - } - - @Override - public ChannelFuture writeWindowUpdate( - ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) { - listener.onWindowUpdate(streamId, windowSizeIncrement); - return super.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise); - } - - @Override - public ChannelFuture writeData( - ChannelHandlerContext ctx, - int streamId, - ByteBuf data, - int padding, - boolean eos, - ChannelPromise promise) { - listener.onData(streamId, data, padding, eos); - return super.writeData(ctx, streamId, data, padding, eos, promise); - } - } -} diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index f02b885c2b..61052bd190 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -38,7 +38,6 @@ import io.grpc.internal.InUseStateAggregator; import io.grpc.internal.KeepAliveManager; import io.grpc.internal.TransportTracer; import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder; -import io.grpc.netty.ListeningEncoder.ListeningStreamBufferingEncoder; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; @@ -47,6 +46,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter; import io.netty.handler.codec.http2.DefaultHttp2Connection; import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder; import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder; @@ -197,8 +197,11 @@ class NettyClientHandler extends AbstractNettyHandler { frameReader = new Http2InboundFrameLogger(frameReader, frameLogger); frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger); + PingCountingFrameWriter pingCounter; + frameWriter = pingCounter = new PingCountingFrameWriter(frameWriter); + StreamBufferingEncoder encoder = - new ListeningStreamBufferingEncoder( + new StreamBufferingEncoder( new DefaultHttp2ConnectionEncoder(connection, frameWriter)); // Create the local flow controller configured to auto-refill the connection window. @@ -237,7 +240,8 @@ class NettyClientHandler extends AbstractNettyHandler { transportTracer, eagAttributes, authority, - autoFlowControl); + autoFlowControl, + pingCounter); } private NettyClientHandler( @@ -251,8 +255,9 @@ class NettyClientHandler extends AbstractNettyHandler { TransportTracer transportTracer, Attributes eagAttributes, String authority, - boolean autoFlowControl) { - super(/* channelUnused= */ null, decoder, encoder, settings, autoFlowControl); + boolean autoFlowControl, + PingLimiter pingLimiter) { + super(/* channelUnused= */ null, decoder, encoder, settings, autoFlowControl, pingLimiter); this.lifecycleManager = lifecycleManager; this.keepAliveManager = keepAliveManager; this.stopwatchFactory = stopwatchFactory; @@ -912,4 +917,63 @@ class NettyClientHandler extends AbstractNettyHandler { } } } + + private static class PingCountingFrameWriter extends DecoratingHttp2FrameWriter + implements AbstractNettyHandler.PingLimiter { + private int pingCount; + + public PingCountingFrameWriter(Http2FrameWriter delegate) { + super(delegate); + } + + @Override + public boolean isPingAllowed() { + // "3 strikes" may cause the server to complain, so we limit ourselves to 2 or below. + return pingCount < 2; + } + + @Override + public ChannelFuture writeHeaders( + ChannelHandlerContext ctx, int streamId, Http2Headers headers, + int padding, boolean endStream, ChannelPromise promise) { + pingCount = 0; + return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise); + } + + @Override + public ChannelFuture writeHeaders( + ChannelHandlerContext ctx, int streamId, Http2Headers headers, + int streamDependency, short weight, boolean exclusive, + int padding, boolean endStream, ChannelPromise promise) { + pingCount = 0; + return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive, + padding, endStream, promise); + } + + @Override + public ChannelFuture writeWindowUpdate( + ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) { + pingCount = 0; + return super.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise); + } + + @Override + public ChannelFuture writePing( + ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) { + if (!ack) { + pingCount++; + } + return super.writePing(ctx, ack, data, promise); + } + + @Override + public ChannelFuture writeData( + ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endStream, + ChannelPromise promise) { + if (data.isReadable()) { + pingCount = 0; + } + return super.writeData(ctx, streamId, data, padding, endStream, promise); + } + } } diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index b40ee7bd45..14ce6ef617 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -45,7 +45,6 @@ import io.grpc.internal.ServerTransportListener; import io.grpc.internal.StatsTraceContext; import io.grpc.internal.TransportTracer; import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder; -import io.grpc.netty.ListeningEncoder.ListeningDefaultHttp2ConnectionEncoder; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelFuture; @@ -55,6 +54,7 @@ import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter; import io.netty.handler.codec.http2.DefaultHttp2Connection; import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder; +import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder; import io.netty.handler.codec.http2.DefaultHttp2FrameReader; import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; import io.netty.handler.codec.http2.DefaultHttp2Headers; @@ -221,7 +221,7 @@ class NettyServerHandler extends AbstractNettyHandler { new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true)); frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer); Http2ConnectionEncoder encoder = - new ListeningDefaultHttp2ConnectionEncoder(connection, frameWriter); + new DefaultHttp2ConnectionEncoder(connection, frameWriter); encoder = new Http2ControlFrameLimitEncoder(encoder, 10000); Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader); @@ -263,7 +263,7 @@ class NettyServerHandler extends AbstractNettyHandler { long maxConnectionAgeGraceInNanos, final KeepAliveEnforcer keepAliveEnforcer, boolean autoFlowControl) { - super(channelUnused, decoder, encoder, settings, autoFlowControl); + super(channelUnused, decoder, encoder, settings, autoFlowControl, null); final MaxConnectionIdleManager maxConnectionIdleManager; if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) { diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index 17586ef925..0ff2d98141 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -713,6 +713,68 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase { protected final ByteBuf windowUpdate(int streamId, int delta) { ChannelHandlerContext ctx = newMockContext(); - new DefaultHttp2FrameWriter().writeWindowUpdate(ctx, 0, delta, newPromise()); + new DefaultHttp2FrameWriter().writeWindowUpdate(ctx, streamId, delta, newPromise()); return captureWrite(ctx); }