mirror of https://github.com/grpc/grpc-java.git
netty: always use double goaway for shutdown
This commit is contained in:
parent
8d429ab056
commit
e5008e53fe
|
|
@ -98,7 +98,8 @@ import javax.annotation.Nullable;
|
||||||
class NettyServerHandler extends AbstractNettyHandler {
|
class NettyServerHandler extends AbstractNettyHandler {
|
||||||
private static final Logger logger = Logger.getLogger(NettyServerHandler.class.getName());
|
private static final Logger logger = Logger.getLogger(NettyServerHandler.class.getName());
|
||||||
private static final long KEEPALIVE_PING = 0xDEADL;
|
private static final long KEEPALIVE_PING = 0xDEADL;
|
||||||
private static final long GRACEFUL_SHUTDOWN_PING = 0x97ACEF001L;
|
@VisibleForTesting
|
||||||
|
static final long GRACEFUL_SHUTDOWN_PING = 0x97ACEF001L;
|
||||||
private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10);
|
private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10);
|
||||||
|
|
||||||
private final Http2Connection.PropertyKey streamKey;
|
private final Http2Connection.PropertyKey streamKey;
|
||||||
|
|
@ -586,6 +587,15 @@ class NettyServerHandler extends AbstractNettyHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
|
||||||
|
if (gracefulShutdown == null) {
|
||||||
|
gracefulShutdown = new GracefulShutdown("app_requested", null);
|
||||||
|
gracefulShutdown.start(ctx);
|
||||||
|
ctx.flush();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the given processed bytes back to inbound flow control.
|
* Returns the given processed bytes back to inbound flow control.
|
||||||
*/
|
*/
|
||||||
|
|
@ -648,7 +658,7 @@ class NettyServerHandler extends AbstractNettyHandler {
|
||||||
|
|
||||||
private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
|
private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
|
||||||
ChannelPromise promise) throws Exception {
|
ChannelPromise promise) throws Exception {
|
||||||
close(ctx, promise);
|
super.close(ctx, promise);
|
||||||
connection().forEachActiveStream(new Http2StreamVisitor() {
|
connection().forEachActiveStream(new Http2StreamVisitor() {
|
||||||
@Override
|
@Override
|
||||||
public boolean visit(Http2Stream stream) throws Http2Exception {
|
public boolean visit(Http2Stream stream) throws Http2Exception {
|
||||||
|
|
@ -906,7 +916,7 @@ class NettyServerHandler extends AbstractNettyHandler {
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
gracefulShutdownTimeoutMillis(gracefulShutdownTimeoutMillis);
|
gracefulShutdownTimeoutMillis(gracefulShutdownTimeoutMillis);
|
||||||
close(ctx, ctx.newPromise());
|
NettyServerHandler.super.close(ctx, ctx.newPromise());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
onError(ctx, /* outbound= */ true, e);
|
onError(ctx, /* outbound= */ true, e);
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
||||||
|
|
@ -41,6 +41,7 @@ import static org.mockito.Matchers.anyInt;
|
||||||
import static org.mockito.Matchers.anyLong;
|
import static org.mockito.Matchers.anyLong;
|
||||||
import static org.mockito.Matchers.anyString;
|
import static org.mockito.Matchers.anyString;
|
||||||
import static org.mockito.Matchers.eq;
|
import static org.mockito.Matchers.eq;
|
||||||
|
import static org.mockito.Matchers.isA;
|
||||||
import static org.mockito.Mockito.atLeastOnce;
|
import static org.mockito.Mockito.atLeastOnce;
|
||||||
import static org.mockito.Mockito.doAnswer;
|
import static org.mockito.Mockito.doAnswer;
|
||||||
import static org.mockito.Mockito.doThrow;
|
import static org.mockito.Mockito.doThrow;
|
||||||
|
|
@ -71,7 +72,6 @@ import io.grpc.internal.testing.TestServerStreamTracer;
|
||||||
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder;
|
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder;
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.buffer.ByteBufUtil;
|
import io.netty.buffer.ByteBufUtil;
|
||||||
import io.netty.buffer.Unpooled;
|
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.ChannelPromise;
|
import io.netty.channel.ChannelPromise;
|
||||||
|
|
@ -331,12 +331,21 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void closeShouldCloseChannel() throws Exception {
|
public void closeShouldGracefullyCloseChannel() throws Exception {
|
||||||
manualSetUp();
|
manualSetUp();
|
||||||
handler().close(ctx(), newPromise());
|
handler().close(ctx(), newPromise());
|
||||||
|
|
||||||
|
verifyWrite().writeGoAway(eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()),
|
||||||
|
isA(ByteBuf.class), any(ChannelPromise.class));
|
||||||
|
verifyWrite().writePing(
|
||||||
|
eq(ctx()),
|
||||||
|
eq(false),
|
||||||
|
eq(NettyServerHandler.GRACEFUL_SHUTDOWN_PING),
|
||||||
|
isA(ChannelPromise.class));
|
||||||
|
channelRead(pingFrame(/*ack=*/ true , NettyServerHandler.GRACEFUL_SHUTDOWN_PING));
|
||||||
|
|
||||||
verifyWrite().writeGoAway(eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()),
|
verifyWrite().writeGoAway(eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()),
|
||||||
eq(Unpooled.EMPTY_BUFFER), any(ChannelPromise.class));
|
isA(ByteBuf.class), any(ChannelPromise.class));
|
||||||
|
|
||||||
// Verify that the channel was closed.
|
// Verify that the channel was closed.
|
||||||
assertFalse(channel().isOpen());
|
assertFalse(channel().isOpen());
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue