Simplify connection callback handling in Netty

Channel is available immediately after connect(), so register callbacks
immediately instead of delaying.

Setting channel is now delayed until it is actually safe to use.
This commit is contained in:
Eric Anderson 2015-02-04 13:01:17 -08:00
parent e6a7b6c8a9
commit c84ef8332e
1 changed files with 36 additions and 34 deletions

View File

@ -178,7 +178,8 @@ class NettyClientTransport extends AbstractClientTransport {
b.handler(negotiation.initializer()); b.handler(negotiation.initializer());
// Start the connection operation to the server. // Start the connection operation to the server.
b.connect(address).addListener(new ChannelFutureListener() { final ChannelFuture connectFuture = b.connect(address);
connectFuture.addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
@ -188,43 +189,44 @@ class NettyClientTransport extends AbstractClientTransport {
} }
// Connected successfully, start the protocol negotiation. // Connected successfully, start the protocol negotiation.
channel = future.channel();
negotiation.onConnected(channel); negotiation.onConnected(channel);
}
});
Futures.addCallback(negotiation.completeFuture(), new FutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
// The negotiation was successful.
// We should not send on the channel until negotiation completes. This is a hard requirement
// by SslHandler but is appropriate for HTTP/1.1 Upgrade as well.
channel = connectFuture.channel();
notifyStarted();
}
final ListenableFuture<Void> negotiationFuture = negotiation.completeFuture(); @Override
Futures.addCallback(negotiationFuture, new FutureCallback<Void>() { public void onFailure(Throwable t) {
@Override // The negotiation failed.
public void onSuccess(Void result) { notifyFailed(t);
// The negotiation was successful. }
notifyStarted(); });
// Handle transport shutdown when the channel is closed. Channel channel = connectFuture.channel();
channel.closeFuture().addListener(new ChannelFutureListener() { // Handle transport shutdown when the channel is closed.
@Override channel.closeFuture().addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception { @Override
if (!future.isSuccess()) { public void operationComplete(ChannelFuture future) throws Exception {
// The close failed. Just notify that transport shutdown failed. if (!future.isSuccess()) {
notifyFailed(future.cause()); // The close failed. Just notify that transport shutdown failed.
return; notifyFailed(future.cause());
} return;
}
if (handler.connectionError() != null) { if (handler.connectionError() != null) {
// The handler encountered a connection error. // The handler encountered a connection error.
notifyFailed(handler.connectionError()); notifyFailed(handler.connectionError());
} else { } else {
// Normal termination of the connection. // Normal termination of the connection.
notifyStopped(); notifyStopped();
} }
}
});
}
@Override
public void onFailure(Throwable t) {
// The negotiation failed.
notifyFailed(t);
}
});
} }
}); });
} }