From 5307b69c9e5701bfe8ca5c304f3c44b8a44b50a8 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Tue, 14 Sep 2021 10:49:48 -0700 Subject: [PATCH] netty: Allow protocol negotiators to shut down transport, with grace period This will be used for draining old connections when xDS configuration changes. --- .../netty/GracefulServerCloseCommand.java | 53 +++++++++++++++++++ .../InternalGracefulServerCloseCommand.java | 36 +++++++++++++ .../io/grpc/netty/NettyServerHandler.java | 24 +++++++-- .../WriteBufferingAndExceptionHandler.java | 2 + .../io/grpc/netty/NettyServerHandlerTest.java | 50 +++++++++++++++++ 5 files changed, 160 insertions(+), 5 deletions(-) create mode 100644 netty/src/main/java/io/grpc/netty/GracefulServerCloseCommand.java create mode 100644 netty/src/main/java/io/grpc/netty/InternalGracefulServerCloseCommand.java diff --git a/netty/src/main/java/io/grpc/netty/GracefulServerCloseCommand.java b/netty/src/main/java/io/grpc/netty/GracefulServerCloseCommand.java new file mode 100644 index 0000000000..9790468754 --- /dev/null +++ b/netty/src/main/java/io/grpc/netty/GracefulServerCloseCommand.java @@ -0,0 +1,53 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.netty; + +import com.google.common.base.Preconditions; +import java.util.concurrent.TimeUnit; + +/** + * A command to trigger close and allow streams naturally close. + */ +class GracefulServerCloseCommand extends WriteQueue.AbstractQueuedCommand { + private final String goAwayDebugString; + private final long graceTime; + private final TimeUnit graceTimeUnit; + + public GracefulServerCloseCommand(String goAwayDebugString) { + this(goAwayDebugString, -1, null); + } + + public GracefulServerCloseCommand( + String goAwayDebugString, long graceTime, TimeUnit graceTimeUnit) { + this.goAwayDebugString = Preconditions.checkNotNull(goAwayDebugString, "goAwayDebugString"); + this.graceTime = graceTime; + this.graceTimeUnit = graceTimeUnit; + } + + public String getGoAwayDebugString() { + return goAwayDebugString; + } + + /** Has no meaning if {@code getGraceTimeUnit() == null}. */ + public long getGraceTime() { + return graceTime; + } + + public TimeUnit getGraceTimeUnit() { + return graceTimeUnit; + } +} diff --git a/netty/src/main/java/io/grpc/netty/InternalGracefulServerCloseCommand.java b/netty/src/main/java/io/grpc/netty/InternalGracefulServerCloseCommand.java new file mode 100644 index 0000000000..deb72373ac --- /dev/null +++ b/netty/src/main/java/io/grpc/netty/InternalGracefulServerCloseCommand.java @@ -0,0 +1,36 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.netty; + +import io.grpc.Internal; +import java.util.concurrent.TimeUnit; + +/** + * Internal accessor for {@link GracefulServerCloseCommand}. + */ +@Internal +public final class InternalGracefulServerCloseCommand { + private InternalGracefulServerCloseCommand() {} + + public static Object create(String goAwayDebugString) { + return new GracefulServerCloseCommand(goAwayDebugString); + } + + public static Object create(String goAwayDebugString, long graceTime, TimeUnit graceTimeUnit) { + return new GracefulServerCloseCommand(goAwayDebugString, graceTime, graceTimeUnit); + } +} diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index 91f8f556f8..c286c17f64 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -641,6 +641,8 @@ class NettyServerHandler extends AbstractNettyHandler { sendResponseHeaders(ctx, (SendResponseHeadersCommand) msg, promise); } else if (msg instanceof CancelServerStreamCommand) { cancelStream(ctx, (CancelServerStreamCommand) msg, promise); + } else if (msg instanceof GracefulServerCloseCommand) { + gracefulClose(ctx, (GracefulServerCloseCommand) msg, promise); } else if (msg instanceof ForcefulCloseCommand) { forcefulClose(ctx, (ForcefulCloseCommand) msg, promise); } else { @@ -654,11 +656,8 @@ class NettyServerHandler extends AbstractNettyHandler { @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { - if (gracefulShutdown == null) { - gracefulShutdown = new GracefulShutdown("app_requested", null); - gracefulShutdown.start(ctx); - ctx.flush(); - } + gracefulClose(ctx, new GracefulServerCloseCommand("app_requested"), promise); + ctx.flush(); } /** @@ -739,6 +738,21 @@ class NettyServerHandler extends AbstractNettyHandler { } } + private void gracefulClose(final ChannelHandlerContext ctx, final GracefulServerCloseCommand msg, + ChannelPromise promise) throws Exception { + // Ideally we'd adjust a pre-existing graceful shutdown's grace period to at least what is + // requested here. But that's an edge case and seems bug-prone. + if (gracefulShutdown == null) { + Long graceTimeInNanos = null; + if (msg.getGraceTimeUnit() != null) { + graceTimeInNanos = msg.getGraceTimeUnit().toNanos(msg.getGraceTime()); + } + gracefulShutdown = new GracefulShutdown(msg.getGoAwayDebugString(), graceTimeInNanos); + gracefulShutdown.start(ctx); + } + promise.setSuccess(); + } + private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg, ChannelPromise promise) throws Exception { super.close(ctx, promise); diff --git a/netty/src/main/java/io/grpc/netty/WriteBufferingAndExceptionHandler.java b/netty/src/main/java/io/grpc/netty/WriteBufferingAndExceptionHandler.java index 9521fc9388..100367625f 100644 --- a/netty/src/main/java/io/grpc/netty/WriteBufferingAndExceptionHandler.java +++ b/netty/src/main/java/io/grpc/netty/WriteBufferingAndExceptionHandler.java @@ -124,6 +124,8 @@ final class WriteBufferingAndExceptionHandler extends ChannelDuplexHandler { promise.setFailure(failCause); ReferenceCountUtil.release(msg); } else { + // Do not special case GracefulServerCloseCommand, as we don't want to cause handshake + // failures. if (msg instanceof GracefulCloseCommand || msg instanceof ForcefulCloseCommand) { // No point in continuing negotiation ctx.close(); diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java index 2f01ed9928..170273e2c6 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java @@ -350,6 +350,56 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase