diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index 1c27222c2f..96636ab6ed 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -40,7 +40,6 @@ import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; @@ -222,21 +221,7 @@ class NettyClientTransport implements ConnectionClientTransport { // Start the write queue as soon as the channel is constructed handler.startWriteQueue(channel); // Start the connection operation to the server. - channel.connect(address).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - ChannelHandlerContext ctx = future.channel().pipeline().context(handler); - if (ctx != null) { - // NettyClientHandler doesn't propagate exceptions, but the negotiator will need the - // exception to fail any writes. Note that this fires after handler, because it is as if - // handler was propagating the notification. - ctx.fireExceptionCaught(future.cause()); - } - future.channel().pipeline().fireExceptionCaught(future.cause()); - } - } - }); + channel.connect(address); // This write will have no effect, yet it will only complete once the negotiationHandler // flushes any pending writes. channel.write(NettyClientHandler.NOOP_MESSAGE).addListener(new ChannelFutureListener() { diff --git a/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java b/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java index 2a2f9c7190..c7dc2f3e66 100644 --- a/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java +++ b/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java @@ -27,6 +27,8 @@ import io.grpc.Internal; import io.grpc.Status; import io.grpc.internal.GrpcUtil; import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; @@ -445,6 +447,25 @@ public final class ProtocolNegotiators { } } + /** + * Do not rely on channel handlers to propagate exceptions to us. + * {@link NettyClientHandler} is an example of a class that does not propagate exceptions. + * Add a listener to the connect future directly and do appropriate error handling. + */ + @Override + public void connect(final ChannelHandlerContext ctx, SocketAddress remoteAddress, + SocketAddress localAddress, ChannelPromise promise) throws Exception { + super.connect(ctx, remoteAddress, localAddress, promise); + promise.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + fail(ctx, future.cause()); + } + } + }); + } + /** * If we encounter an exception, then notify all buffered writes that we failed. */