netty: handle reentrant exception in WBAEH

This commit is contained in:
Carl Mastrangelo 2019-01-18 15:18:31 -08:00 committed by GitHub
parent 5dbe53c050
commit 6f83bfc393
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 56 additions and 1 deletions

View File

@ -29,6 +29,8 @@ import io.netty.util.ReferenceCountUtil;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Queue; import java.util.Queue;
import java.util.logging.Level;
import java.util.logging.Logger;
/** /**
* Buffers all writes until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} or * Buffers all writes until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} or
@ -37,6 +39,8 @@ import java.util.Queue;
* i.e. before it's active or the TLS Handshake is complete. * i.e. before it's active or the TLS Handshake is complete.
*/ */
final class WriteBufferingAndExceptionHandler extends ChannelDuplexHandler { final class WriteBufferingAndExceptionHandler extends ChannelDuplexHandler {
private static final Logger logger =
Logger.getLogger(WriteBufferingAndExceptionHandler.class.getName());
private final Queue<ChannelWrite> bufferedWrites = new ArrayDeque<>(); private final Queue<ChannelWrite> bufferedWrites = new ArrayDeque<>();
private final ChannelHandler next; private final ChannelHandler next;
@ -75,9 +79,21 @@ final class WriteBufferingAndExceptionHandler extends ChannelDuplexHandler {
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
assert cause != null;
Throwable previousFailure = failCause;
Status status = Utils.statusFromThrowable(cause); Status status = Utils.statusFromThrowable(cause);
failWrites(status.asRuntimeException()); failWrites(status.asRuntimeException());
if (ctx.channel().isActive()) { // Check to see if the channel is active and this is the first failure. If a downstream
// handler triggers an exception in close(), avoid being reentrant. This is not obviously
// correct, so here are the cases and how they are correctly handled:
// 1. !active, prev==null: the channel is inactive, no-op
// 2. !active, prev!=null: the channel is inactive, no-op
// 3. active, prev==null: this is the first error, close
// 4a. active, prev!=null[channelInactive]: impossible, no-op
// 4b. active, prev!=null[close]: close() cannot succeed, no point in calling ctx.close().
// 4c. active, prev!=null[handlerRemoved]: channel will be closed out-of-band by buffered write.
// 4d. active, prev!=null[connect]: impossible, channel can't be active after a failed connect.
if (ctx.channel().isActive() && previousFailure == null) {
ctx.close(); ctx.close();
} }
} }
@ -178,6 +194,8 @@ final class WriteBufferingAndExceptionHandler extends ChannelDuplexHandler {
private void failWrites(Throwable cause) { private void failWrites(Throwable cause) {
if (failCause == null) { if (failCause == null) {
failCause = cause; failCause = cause;
} else {
logger.log(Level.FINE, "Ignoring duplicate failure", cause);
} }
while (!bufferedWrites.isEmpty()) { while (!bufferedWrites.isEmpty()) {
ChannelWrite write = bufferedWrites.poll(); ChannelWrite write = bufferedWrites.poll();

View File

@ -17,6 +17,7 @@
package io.grpc.netty; package io.grpc.netty;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -27,6 +28,7 @@ import io.grpc.Status.Code;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -39,6 +41,7 @@ import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel; import io.netty.channel.local.LocalServerChannel;
import java.net.ConnectException; import java.net.ConnectException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.junit.After; import org.junit.After;
import org.junit.Rule; import org.junit.Rule;
@ -204,6 +207,40 @@ public class WriteBufferingAndExceptionHandlerTest {
} }
} }
@Test
public void uncaughtException_closeAtMostOnce() throws Exception {
final AtomicInteger closes = new AtomicInteger();
WriteBufferingAndExceptionHandler handler =
new WriteBufferingAndExceptionHandler(new ChannelDuplexHandler() {
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
closes.getAndIncrement();
// Simulates a loop between this handler and the WriteBufferingAndExceptionHandler.
ctx.fireExceptionCaught(Status.ABORTED.withDescription("zap").asRuntimeException());
super.close(ctx, promise);
}
});
LocalAddress addr = new LocalAddress("local");
ChannelFuture cf = new Bootstrap()
.channel(LocalChannel.class)
.handler(handler)
.group(group)
.register();
chan = cf.channel();
cf.sync();
ChannelFuture sf = new ServerBootstrap()
.channel(LocalServerChannel.class)
.childHandler(new ChannelHandlerAdapter() {})
.group(group)
.bind(addr);
server = sf.channel();
sf.sync();
chan.connect(addr).sync();
chan.close().sync();
assertEquals(1, closes.get());
}
@Test @Test
public void handlerRemovedFailuresPropagated() throws Exception { public void handlerRemovedFailuresPropagated() throws Exception {
WriteBufferingAndExceptionHandler handler = WriteBufferingAndExceptionHandler handler =