From c84ef8332e295ae20a06f4bdcaa95866821d494b Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Wed, 4 Feb 2015 13:01:17 -0800 Subject: [PATCH] Simplify connection callback handling in Netty Channel is available immediately after connect(), so register callbacks immediately instead of delaying. Setting channel is now delayed until it is actually safe to use. --- .../transport/netty/NettyClientTransport.java | 70 ++++++++++--------- 1 file changed, 36 insertions(+), 34 deletions(-) diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java index 7819768eb5..eb62205a57 100644 --- a/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java @@ -178,7 +178,8 @@ class NettyClientTransport extends AbstractClientTransport { b.handler(negotiation.initializer()); // Start the connection operation to the server. - b.connect(address).addListener(new ChannelFutureListener() { + final ChannelFuture connectFuture = b.connect(address); + connectFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { @@ -188,43 +189,44 @@ class NettyClientTransport extends AbstractClientTransport { } // Connected successfully, start the protocol negotiation. - channel = future.channel(); negotiation.onConnected(channel); + } + }); + Futures.addCallback(negotiation.completeFuture(), new FutureCallback() { + @Override + public void onSuccess(Void result) { + // The negotiation was successful. + // We should not send on the channel until negotiation completes. This is a hard requirement + // by SslHandler but is appropriate for HTTP/1.1 Upgrade as well. + channel = connectFuture.channel(); + notifyStarted(); + } - final ListenableFuture negotiationFuture = negotiation.completeFuture(); - Futures.addCallback(negotiationFuture, new FutureCallback() { - @Override - public void onSuccess(Void result) { - // The negotiation was successful. - notifyStarted(); + @Override + public void onFailure(Throwable t) { + // The negotiation failed. + notifyFailed(t); + } + }); - // Handle transport shutdown when the channel is closed. - channel.closeFuture().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - // The close failed. Just notify that transport shutdown failed. - notifyFailed(future.cause()); - return; - } + Channel channel = connectFuture.channel(); + // Handle transport shutdown when the channel is closed. + channel.closeFuture().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + // The close failed. Just notify that transport shutdown failed. + notifyFailed(future.cause()); + return; + } - if (handler.connectionError() != null) { - // The handler encountered a connection error. - notifyFailed(handler.connectionError()); - } else { - // Normal termination of the connection. - notifyStopped(); - } - } - }); - } - - @Override - public void onFailure(Throwable t) { - // The negotiation failed. - notifyFailed(t); - } - }); + if (handler.connectionError() != null) { + // The handler encountered a connection error. + notifyFailed(handler.connectionError()); + } else { + // Normal termination of the connection. + notifyStopped(); + } } }); }