netty: http2 server transport graceful shutdown sends 2 GOAWAYs

resolves #3442
This commit is contained in:
ZHANG Dapeng 2018-03-28 15:58:31 -07:00 committed by GitHub
parent 03a00aa8cf
commit bdecdaea22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 331 additions and 40 deletions

View File

@ -82,6 +82,7 @@ import io.netty.handler.logging.LogLevel;
import io.netty.util.AsciiString;
import io.netty.util.ReferenceCountUtil;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
@ -96,6 +97,8 @@ import javax.annotation.Nullable;
class NettyServerHandler extends AbstractNettyHandler {
private static final Logger logger = Logger.getLogger(NettyServerHandler.class.getName());
private static final long KEEPALIVE_PING = 0xDEADL;
private static final long GRACEFUL_SHUTDOWN_PING = 0x97ACEF001L;
private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10);
private final Http2Connection.PropertyKey streamKey;
private final ServerTransportListener transportListener;
@ -121,6 +124,8 @@ class NettyServerHandler extends AbstractNettyHandler {
private MaxConnectionIdleManager maxConnectionIdleManager;
@CheckForNull
private ScheduledFuture<?> maxConnectionAgeMonitor;
@CheckForNull
private GracefulShutdown gracefulShutdown;
static NettyServerHandler newHandler(
ServerTransportListener transportListener,
@ -250,17 +255,10 @@ class NettyServerHandler extends AbstractNettyHandler {
maxConnectionIdleManager = new MaxConnectionIdleManager(maxConnectionIdleInNanos) {
@Override
void close(ChannelHandlerContext ctx) {
goAway(
ctx,
Integer.MAX_VALUE,
Http2Error.NO_ERROR.code(),
ByteBufUtil.writeAscii(ctx.alloc(), "max_idle"),
ctx.newPromise());
ctx.flush();
try {
NettyServerHandler.this.close(ctx, ctx.newPromise());
} catch (Exception e) {
onError(ctx, /* outbound= */ true, e);
if (gracefulShutdown == null) {
gracefulShutdown = new GracefulShutdown("max_idle", null);
gracefulShutdown.start(ctx);
ctx.flush();
}
}
};
@ -321,25 +319,10 @@ class NettyServerHandler extends AbstractNettyHandler {
new LogExceptionRunnable(new Runnable() {
@Override
public void run() {
// send GO_AWAY
ByteBuf debugData = ByteBufUtil.writeAscii(ctx.alloc(), "max_age");
goAway(
ctx,
Integer.MAX_VALUE,
Http2Error.NO_ERROR.code(),
debugData,
ctx.newPromise());
// gracefully shutdown with specified grace time
long savedGracefulShutdownTime = gracefulShutdownTimeoutMillis();
try {
gracefulShutdownTimeoutMillis(
TimeUnit.NANOSECONDS.toMillis(maxConnectionAgeGraceInNanos));
close(ctx, ctx.newPromise());
} catch (Exception e) {
onError(ctx, /* outbound= */ true, e);
} finally {
gracefulShutdownTimeoutMillis(savedGracefulShutdownTime);
if (gracefulShutdown == null) {
gracefulShutdown = new GracefulShutdown("max_age", maxConnectionAgeGraceInNanos);
gracefulShutdown.start(ctx);
ctx.flush();
}
}
}),
@ -787,6 +770,13 @@ class NettyServerHandler extends AbstractNettyHandler {
logger.log(Level.FINE, String.format("Window: %d",
decoder().flowController().initialWindowSize(connection().connectionStream())));
}
} else if (data == GRACEFUL_SHUTDOWN_PING) {
if (gracefulShutdown == null) {
// this should never happen
logger.warning("Received GRACEFUL_SHUTDOWN_PING Ack but gracefulShutdown is null");
} else {
gracefulShutdown.secondGoAwayAndClose(ctx);
}
} else if (data != KEEPALIVE_PING) {
logger.warning("Received unexpected ping ack. No ping outstanding");
}
@ -803,7 +793,6 @@ class NettyServerHandler extends AbstractNettyHandler {
@Override
public void ping() {
ChannelFuture pingFuture = encoder().writePing(
// slice KEEPALIVE_PING because tls handler may modify the reader index
ctx, false /* isAck */, KEEPALIVE_PING, ctx.newPromise());
ctx.flush();
if (transportTracer != null) {
@ -837,6 +826,88 @@ class NettyServerHandler extends AbstractNettyHandler {
}
}
private final class GracefulShutdown {
String goAwayMessage;
/**
* The grace time between starting graceful shutdown and closing the netty channel,
* {@code null} is unspecified.
*/
@CheckForNull
Long graceTimeInNanos;
/**
* True if ping is Acked or ping is timeout.
*/
boolean pingAckedOrTimeout;
Future<?> pingFuture;
GracefulShutdown(String goAwayMessage,
@Nullable Long graceTimeInNanos) {
this.goAwayMessage = goAwayMessage;
this.graceTimeInNanos = graceTimeInNanos;
}
/**
* Sends out first GOAWAY and ping, and schedules second GOAWAY and close.
*/
void start(final ChannelHandlerContext ctx) {
goAway(
ctx,
Integer.MAX_VALUE,
Http2Error.NO_ERROR.code(),
ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
ctx.newPromise());
long gracefulShutdownPingTimeout = GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS;
pingFuture = ctx.executor().schedule(
new Runnable() {
@Override
public void run() {
secondGoAwayAndClose(ctx);
}
},
GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS,
TimeUnit.NANOSECONDS);
encoder().writePing(ctx, false /* isAck */, GRACEFUL_SHUTDOWN_PING, ctx.newPromise());
}
void secondGoAwayAndClose(ChannelHandlerContext ctx) {
if (pingAckedOrTimeout) {
return;
}
pingAckedOrTimeout = true;
checkNotNull(pingFuture, "pingFuture");
pingFuture.cancel(false);
// send the second GOAWAY with last stream id
goAway(
ctx,
connection().remote().lastStreamCreated(),
Http2Error.NO_ERROR.code(),
ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
ctx.newPromise());
// gracefully shutdown with specified grace time
long savedGracefulShutdownTimeMillis = gracefulShutdownTimeoutMillis();
long gracefulShutdownTimeoutMillis = savedGracefulShutdownTimeMillis;
if (graceTimeInNanos != null) {
gracefulShutdownTimeoutMillis = TimeUnit.NANOSECONDS.toMillis(graceTimeInNanos);
}
try {
gracefulShutdownTimeoutMillis(gracefulShutdownTimeoutMillis);
close(ctx, ctx.newPromise());
} catch (Exception e) {
onError(ctx, /* outbound= */ true, e);
} finally {
gracefulShutdownTimeoutMillis(savedGracefulShutdownTimeMillis);
}
}
}
// Use a frame writer so that we know when frames are through flow control and actually being
// written.
private static class WriteMonitoringFrameWriter extends DecoratingHttp2FrameWriter {

View File

@ -697,24 +697,72 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
}
@Test
public void maxConnectionIdle_goAwaySent() throws Exception {
public void maxConnectionIdle_goAwaySent_pingAck() throws Exception {
maxConnectionIdleInNanos = TimeUnit.MILLISECONDS.toNanos(10L);
manualSetUp();
assertTrue(channel().isOpen());
fakeClock().forwardNanos(maxConnectionIdleInNanos);
// GO_AWAY sent
// first GO_AWAY sent
verifyWrite().writeGoAway(
eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
// ping sent
verifyWrite().writePing(
eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class));
verifyWrite(never()).writeGoAway(
eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
channelRead(pingFrame(true /* isAck */, 0xDEADL)); // irrelevant ping Ack
verifyWrite(never()).writeGoAway(
eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
assertTrue(channel().isOpen());
channelRead(pingFrame(true /* isAck */, 0x97ACEF001L));
// second GO_AWAY sent
verifyWrite().writeGoAway(
eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
// channel closed
assertTrue(!channel().isOpen());
}
@Test
public void maxConnectionIdle_activeThenRst() throws Exception {
public void maxConnectionIdle_goAwaySent_pingTimeout() throws Exception {
maxConnectionIdleInNanos = TimeUnit.MILLISECONDS.toNanos(10L);
manualSetUp();
assertTrue(channel().isOpen());
fakeClock().forwardNanos(maxConnectionIdleInNanos);
// first GO_AWAY sent
verifyWrite().writeGoAway(
eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
// ping sent
verifyWrite().writePing(
eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class));
verifyWrite(never()).writeGoAway(
eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
assertTrue(channel().isOpen());
fakeClock().forwardTime(10, TimeUnit.SECONDS);
// second GO_AWAY sent
verifyWrite().writeGoAway(
eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
// channel closed
assertTrue(!channel().isOpen());
}
@Test
public void maxConnectionIdle_activeThenRst_pingAck() throws Exception {
maxConnectionIdleInNanos = TimeUnit.MILLISECONDS.toNanos(10L);
manualSetUp();
createStream();
@ -731,11 +779,64 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
fakeClock().forwardNanos(maxConnectionIdleInNanos);
// GO_AWAY sent
// first GO_AWAY sent
verifyWrite().writeGoAway(
eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
// ping sent
verifyWrite().writePing(
eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class));
verifyWrite(never()).writeGoAway(
eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
assertTrue(channel().isOpen());
fakeClock().forwardTime(10, TimeUnit.SECONDS);
// second GO_AWAY sent
verifyWrite().writeGoAway(
eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
// channel closed
assertTrue(!channel().isOpen());
}
@Test
public void maxConnectionIdle_activeThenRst_pingTimeoutk() throws Exception {
maxConnectionIdleInNanos = TimeUnit.MILLISECONDS.toNanos(10L);
manualSetUp();
createStream();
fakeClock().forwardNanos(maxConnectionIdleInNanos);
// GO_AWAY not sent when active
verifyWrite(never()).writeGoAway(
any(ChannelHandlerContext.class), any(Integer.class), any(Long.class), any(ByteBuf.class),
any(ChannelPromise.class));
assertTrue(channel().isOpen());
channelRead(rstStreamFrame(STREAM_ID, (int) Http2Error.CANCEL.code()));
fakeClock().forwardNanos(maxConnectionIdleInNanos);
// first GO_AWAY sent
verifyWrite().writeGoAway(
eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
// ping sent
verifyWrite().writePing(
eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class));
verifyWrite(never()).writeGoAway(
eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
assertTrue(channel().isOpen());
channelRead(pingFrame(true /* isAck */, 0x97ACEF001L));
// second GO_AWAY sent
verifyWrite().writeGoAway(
eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
// channel closed
assertTrue(!channel().isOpen());
}
@ -755,18 +856,68 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
}
@Test
public void maxConnectionAge_goAwaySent() throws Exception {
public void maxConnectionAge_goAwaySent_pingAck() throws Exception {
maxConnectionAgeInNanos = TimeUnit.MILLISECONDS.toNanos(10L);
manualSetUp();
assertTrue(channel().isOpen());
fakeClock().forwardNanos(maxConnectionAgeInNanos);
// GO_AWAY sent
// first GO_AWAY sent
verifyWrite().writeGoAway(
eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
// ping sent
verifyWrite().writePing(
eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class));
verifyWrite(never()).writeGoAway(
eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
channelRead(pingFrame(true /* isAck */, 0xDEADL)); // irrelevant ping Ack
verifyWrite(never()).writeGoAway(
eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
assertTrue(channel().isOpen());
channelRead(pingFrame(true /* isAck */, 0x97ACEF001L));
// second GO_AWAY sent
verifyWrite().writeGoAway(
eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
// channel closed
assertTrue(!channel().isOpen());
}
@Test
public void maxConnectionAge_goAwaySent_pingTimeout() throws Exception {
maxConnectionAgeInNanos = TimeUnit.MILLISECONDS.toNanos(10L);
manualSetUp();
assertTrue(channel().isOpen());
fakeClock().forwardNanos(maxConnectionAgeInNanos);
// first GO_AWAY sent
verifyWrite().writeGoAway(
eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
// ping sent
verifyWrite().writePing(
eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class));
verifyWrite(never()).writeGoAway(
eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
assertTrue(channel().isOpen());
fakeClock().forwardTime(10, TimeUnit.SECONDS);
// second GO_AWAY sent
verifyWrite().writeGoAway(
eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
// channel closed
assertTrue(!channel().isOpen());
}
@ -780,31 +931,100 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
fakeClock().forwardNanos(maxConnectionAgeInNanos);
// first GO_AWAY sent
verifyWrite().writeGoAway(
eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
// ping sent
verifyWrite().writePing(
eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class));
verifyWrite(never()).writeGoAway(
eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
fakeClock().forwardTime(20, TimeUnit.MINUTES);
// second GO_AWAY sent
verifyWrite().writeGoAway(
eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
// channel not closed yet
assertTrue(channel().isOpen());
}
@Test
public void maxConnectionAgeGrace_channelClosedAfterGracePeriod() throws Exception {
public void maxConnectionAgeGrace_channelClosedAfterGracePeriod_withPingTimeout()
throws Exception {
maxConnectionAgeInNanos = TimeUnit.MILLISECONDS.toNanos(10L);
maxConnectionAgeGraceInNanos = TimeUnit.MINUTES.toNanos(30L);
maxConnectionAgeGraceInNanos = TimeUnit.MINUTES.toNanos(30L); // greater than ping timeout
manualSetUp();
createStream();
fakeClock().forwardNanos(maxConnectionAgeInNanos);
// first GO_AWAY sent
verifyWrite().writeGoAway(
eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
// ping sent
verifyWrite().writePing(
eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class));
verifyWrite(never()).writeGoAway(
eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
fakeClock().forwardNanos(TimeUnit.SECONDS.toNanos(10));
// second GO_AWAY sent
verifyWrite().writeGoAway(
eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
fakeClock().forwardNanos(maxConnectionAgeGraceInNanos - 2);
assertTrue(channel().isOpen());
fakeClock().forwardNanos(maxConnectionAgeGraceInNanos);
fakeClock().forwardTime(2, TimeUnit.MILLISECONDS);
// channel closed
assertTrue(!channel().isOpen());
}
@Test
public void maxConnectionAgeGrace_channelClosedAfterGracePeriod_withPingAck()
throws Exception {
maxConnectionAgeInNanos = TimeUnit.MILLISECONDS.toNanos(10L);
maxConnectionAgeGraceInNanos = TimeUnit.MINUTES.toNanos(30L); // greater than ping timeout
manualSetUp();
createStream();
fakeClock().forwardNanos(maxConnectionAgeInNanos);
// first GO_AWAY sent
verifyWrite().writeGoAway(
eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
// ping sent
verifyWrite().writePing(
eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class));
verifyWrite(never()).writeGoAway(
eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
long pingRoundTripMillis = 100; // less than ping timeout
fakeClock().forwardTime(pingRoundTripMillis, TimeUnit.MILLISECONDS);
channelRead(pingFrame(true /* isAck */, 0x97ACEF001L));
// second GO_AWAY sent
verifyWrite().writeGoAway(
eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
fakeClock().forwardNanos(maxConnectionAgeGraceInNanos - TimeUnit.MILLISECONDS.toNanos(2));
assertTrue(channel().isOpen());
fakeClock().forwardTime(2, TimeUnit.MILLISECONDS);
// channel closed
assertTrue(!channel().isOpen());