diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java index 7819768eb5..eb62205a57 100644 --- a/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java @@ -178,7 +178,8 @@ class NettyClientTransport extends AbstractClientTransport { b.handler(negotiation.initializer()); // Start the connection operation to the server. - b.connect(address).addListener(new ChannelFutureListener() { + final ChannelFuture connectFuture = b.connect(address); + connectFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { @@ -188,43 +189,44 @@ class NettyClientTransport extends AbstractClientTransport { } // Connected successfully, start the protocol negotiation. - channel = future.channel(); negotiation.onConnected(channel); + } + }); + Futures.addCallback(negotiation.completeFuture(), new FutureCallback() { + @Override + public void onSuccess(Void result) { + // The negotiation was successful. + // We should not send on the channel until negotiation completes. This is a hard requirement + // by SslHandler but is appropriate for HTTP/1.1 Upgrade as well. + channel = connectFuture.channel(); + notifyStarted(); + } - final ListenableFuture negotiationFuture = negotiation.completeFuture(); - Futures.addCallback(negotiationFuture, new FutureCallback() { - @Override - public void onSuccess(Void result) { - // The negotiation was successful. - notifyStarted(); + @Override + public void onFailure(Throwable t) { + // The negotiation failed. + notifyFailed(t); + } + }); - // Handle transport shutdown when the channel is closed. - channel.closeFuture().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - // The close failed. Just notify that transport shutdown failed. - notifyFailed(future.cause()); - return; - } + Channel channel = connectFuture.channel(); + // Handle transport shutdown when the channel is closed. + channel.closeFuture().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + // The close failed. Just notify that transport shutdown failed. + notifyFailed(future.cause()); + return; + } - if (handler.connectionError() != null) { - // The handler encountered a connection error. - notifyFailed(handler.connectionError()); - } else { - // Normal termination of the connection. - notifyStopped(); - } - } - }); - } - - @Override - public void onFailure(Throwable t) { - // The negotiation failed. - notifyFailed(t); - } - }); + if (handler.connectionError() != null) { + // The handler encountered a connection error. + notifyFailed(handler.connectionError()); + } else { + // Normal termination of the connection. + notifyStopped(); + } } }); }