diff --git a/netty/src/main/java/io/grpc/netty/WriteBufferingAndExceptionHandler.java b/netty/src/main/java/io/grpc/netty/WriteBufferingAndExceptionHandler.java index b348a1b2ab..a47bcc7f29 100644 --- a/netty/src/main/java/io/grpc/netty/WriteBufferingAndExceptionHandler.java +++ b/netty/src/main/java/io/grpc/netty/WriteBufferingAndExceptionHandler.java @@ -29,6 +29,8 @@ import io.netty.util.ReferenceCountUtil; import java.net.SocketAddress; import java.util.ArrayDeque; import java.util.Queue; +import java.util.logging.Level; +import java.util.logging.Logger; /** * Buffers all writes until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} or @@ -37,6 +39,8 @@ import java.util.Queue; * i.e. before it's active or the TLS Handshake is complete. */ final class WriteBufferingAndExceptionHandler extends ChannelDuplexHandler { + private static final Logger logger = + Logger.getLogger(WriteBufferingAndExceptionHandler.class.getName()); private final Queue bufferedWrites = new ArrayDeque<>(); private final ChannelHandler next; @@ -75,9 +79,21 @@ final class WriteBufferingAndExceptionHandler extends ChannelDuplexHandler { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + assert cause != null; + Throwable previousFailure = failCause; Status status = Utils.statusFromThrowable(cause); failWrites(status.asRuntimeException()); - if (ctx.channel().isActive()) { + // Check to see if the channel is active and this is the first failure. If a downstream + // handler triggers an exception in close(), avoid being reentrant. This is not obviously + // correct, so here are the cases and how they are correctly handled: + // 1. !active, prev==null: the channel is inactive, no-op + // 2. !active, prev!=null: the channel is inactive, no-op + // 3. active, prev==null: this is the first error, close + // 4a. active, prev!=null[channelInactive]: impossible, no-op + // 4b. active, prev!=null[close]: close() cannot succeed, no point in calling ctx.close(). + // 4c. active, prev!=null[handlerRemoved]: channel will be closed out-of-band by buffered write. + // 4d. active, prev!=null[connect]: impossible, channel can't be active after a failed connect. + if (ctx.channel().isActive() && previousFailure == null) { ctx.close(); } } @@ -178,6 +194,8 @@ final class WriteBufferingAndExceptionHandler extends ChannelDuplexHandler { private void failWrites(Throwable cause) { if (failCause == null) { failCause = cause; + } else { + logger.log(Level.FINE, "Ignoring duplicate failure", cause); } while (!bufferedWrites.isEmpty()) { ChannelWrite write = bufferedWrites.poll(); diff --git a/netty/src/test/java/io/grpc/netty/WriteBufferingAndExceptionHandlerTest.java b/netty/src/test/java/io/grpc/netty/WriteBufferingAndExceptionHandlerTest.java index 565d9ca2a0..21fe571805 100644 --- a/netty/src/test/java/io/grpc/netty/WriteBufferingAndExceptionHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/WriteBufferingAndExceptionHandlerTest.java @@ -17,6 +17,7 @@ package io.grpc.netty; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -27,6 +28,7 @@ import io.grpc.Status.Code; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; @@ -39,6 +41,7 @@ import io.netty.channel.local.LocalChannel; import io.netty.channel.local.LocalServerChannel; import java.net.ConnectException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.junit.After; import org.junit.Rule; @@ -204,6 +207,40 @@ public class WriteBufferingAndExceptionHandlerTest { } } + @Test + public void uncaughtException_closeAtMostOnce() throws Exception { + final AtomicInteger closes = new AtomicInteger(); + WriteBufferingAndExceptionHandler handler = + new WriteBufferingAndExceptionHandler(new ChannelDuplexHandler() { + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + closes.getAndIncrement(); + // Simulates a loop between this handler and the WriteBufferingAndExceptionHandler. + ctx.fireExceptionCaught(Status.ABORTED.withDescription("zap").asRuntimeException()); + super.close(ctx, promise); + } + }); + LocalAddress addr = new LocalAddress("local"); + ChannelFuture cf = new Bootstrap() + .channel(LocalChannel.class) + .handler(handler) + .group(group) + .register(); + chan = cf.channel(); + cf.sync(); + ChannelFuture sf = new ServerBootstrap() + .channel(LocalServerChannel.class) + .childHandler(new ChannelHandlerAdapter() {}) + .group(group) + .bind(addr); + server = sf.channel(); + sf.sync(); + + chan.connect(addr).sync(); + chan.close().sync(); + assertEquals(1, closes.get()); + } + @Test public void handlerRemovedFailuresPropagated() throws Exception { WriteBufferingAndExceptionHandler handler =