netty: Allow protocol negotiators to shut down transport, with grace period

This will be used for draining old connections when xDS configuration
changes.
This commit is contained in:
Eric Anderson 2021-09-14 10:49:48 -07:00
parent 122b3b2f7c
commit 5307b69c9e
5 changed files with 160 additions and 5 deletions

View File

@ -0,0 +1,53 @@
/*
* Copyright 2021 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.netty;
import com.google.common.base.Preconditions;
import java.util.concurrent.TimeUnit;
/**
* A command to trigger close and allow streams naturally close.
*/
class GracefulServerCloseCommand extends WriteQueue.AbstractQueuedCommand {
private final String goAwayDebugString;
private final long graceTime;
private final TimeUnit graceTimeUnit;
public GracefulServerCloseCommand(String goAwayDebugString) {
this(goAwayDebugString, -1, null);
}
public GracefulServerCloseCommand(
String goAwayDebugString, long graceTime, TimeUnit graceTimeUnit) {
this.goAwayDebugString = Preconditions.checkNotNull(goAwayDebugString, "goAwayDebugString");
this.graceTime = graceTime;
this.graceTimeUnit = graceTimeUnit;
}
public String getGoAwayDebugString() {
return goAwayDebugString;
}
/** Has no meaning if {@code getGraceTimeUnit() == null}. */
public long getGraceTime() {
return graceTime;
}
public TimeUnit getGraceTimeUnit() {
return graceTimeUnit;
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright 2021 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.netty;
import io.grpc.Internal;
import java.util.concurrent.TimeUnit;
/**
* Internal accessor for {@link GracefulServerCloseCommand}.
*/
@Internal
public final class InternalGracefulServerCloseCommand {
private InternalGracefulServerCloseCommand() {}
public static Object create(String goAwayDebugString) {
return new GracefulServerCloseCommand(goAwayDebugString);
}
public static Object create(String goAwayDebugString, long graceTime, TimeUnit graceTimeUnit) {
return new GracefulServerCloseCommand(goAwayDebugString, graceTime, graceTimeUnit);
}
}

View File

@ -641,6 +641,8 @@ class NettyServerHandler extends AbstractNettyHandler {
sendResponseHeaders(ctx, (SendResponseHeadersCommand) msg, promise);
} else if (msg instanceof CancelServerStreamCommand) {
cancelStream(ctx, (CancelServerStreamCommand) msg, promise);
} else if (msg instanceof GracefulServerCloseCommand) {
gracefulClose(ctx, (GracefulServerCloseCommand) msg, promise);
} else if (msg instanceof ForcefulCloseCommand) {
forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
} else {
@ -654,11 +656,8 @@ class NettyServerHandler extends AbstractNettyHandler {
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
if (gracefulShutdown == null) {
gracefulShutdown = new GracefulShutdown("app_requested", null);
gracefulShutdown.start(ctx);
ctx.flush();
}
gracefulClose(ctx, new GracefulServerCloseCommand("app_requested"), promise);
ctx.flush();
}
/**
@ -739,6 +738,21 @@ class NettyServerHandler extends AbstractNettyHandler {
}
}
private void gracefulClose(final ChannelHandlerContext ctx, final GracefulServerCloseCommand msg,
ChannelPromise promise) throws Exception {
// Ideally we'd adjust a pre-existing graceful shutdown's grace period to at least what is
// requested here. But that's an edge case and seems bug-prone.
if (gracefulShutdown == null) {
Long graceTimeInNanos = null;
if (msg.getGraceTimeUnit() != null) {
graceTimeInNanos = msg.getGraceTimeUnit().toNanos(msg.getGraceTime());
}
gracefulShutdown = new GracefulShutdown(msg.getGoAwayDebugString(), graceTimeInNanos);
gracefulShutdown.start(ctx);
}
promise.setSuccess();
}
private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
ChannelPromise promise) throws Exception {
super.close(ctx, promise);

View File

@ -124,6 +124,8 @@ final class WriteBufferingAndExceptionHandler extends ChannelDuplexHandler {
promise.setFailure(failCause);
ReferenceCountUtil.release(msg);
} else {
// Do not special case GracefulServerCloseCommand, as we don't want to cause handshake
// failures.
if (msg instanceof GracefulCloseCommand || msg instanceof ForcefulCloseCommand) {
// No point in continuing negotiation
ctx.close();

View File

@ -350,6 +350,56 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
assertFalse(channel().isOpen());
}
@Test
public void gracefulCloseShouldGracefullyCloseChannel() throws Exception {
manualSetUp();
handler()
.write(ctx(), new GracefulServerCloseCommand("test", 1, TimeUnit.MINUTES), newPromise());
verifyWrite().writeGoAway(eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()),
isA(ByteBuf.class), any(ChannelPromise.class));
verifyWrite().writePing(
eq(ctx()),
eq(false),
eq(NettyServerHandler.GRACEFUL_SHUTDOWN_PING),
isA(ChannelPromise.class));
channelRead(pingFrame(/*ack=*/ true , NettyServerHandler.GRACEFUL_SHUTDOWN_PING));
verifyWrite().writeGoAway(eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()),
isA(ByteBuf.class), any(ChannelPromise.class));
// Verify that the channel was closed.
assertFalse(channel().isOpen());
}
@Test
public void secondGracefulCloseIsSafe() throws Exception {
manualSetUp();
handler().write(ctx(), new GracefulServerCloseCommand("test"), newPromise());
verifyWrite().writeGoAway(eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()),
isA(ByteBuf.class), any(ChannelPromise.class));
verifyWrite().writePing(
eq(ctx()),
eq(false),
eq(NettyServerHandler.GRACEFUL_SHUTDOWN_PING),
isA(ChannelPromise.class));
handler().write(ctx(), new GracefulServerCloseCommand("test2"), newPromise());
channel().runPendingTasks();
// No additional GOAWAYs.
verifyWrite().writeGoAway(any(ChannelHandlerContext.class), any(Integer.class), any(Long.class),
any(ByteBuf.class), any(ChannelPromise.class));
channel().checkException();
assertTrue(channel().isOpen());
channelRead(pingFrame(/*ack=*/ true , NettyServerHandler.GRACEFUL_SHUTDOWN_PING));
verifyWrite().writeGoAway(eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()),
isA(ByteBuf.class), any(ChannelPromise.class));
assertFalse(channel().isOpen());
}
@Test
public void exceptionCaughtShouldCloseConnection() throws Exception {
manualSetUp();