netty: fix getListenSockets race (#4301)

Move registration to separate future and wait for it.
This commit is contained in:
zpencer 2018-04-04 17:04:29 -07:00 committed by GitHub
parent 9ed84258aa
commit 49c8bdb60a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 17 additions and 11 deletions

View File

@ -92,9 +92,9 @@ class NettyServer implements InternalServer, WithLogId {
private final List<ServerStreamTracer.Factory> streamTracerFactories;
private final TransportTracer.Factory transportTracerFactory;
private final Channelz channelz;
// Only set once during start(). This code assumes all listen sockets are created at startup
// and never changed. In the future we may have >1 listen socket.
private ImmutableList<Instrumented<SocketStats>> listenSockets;
// Only modified in event loop but safe to read any time. Set at startup and unset at shutdown.
// In the future we may have >1 listen socket.
private volatile ImmutableList<Instrumented<SocketStats>> listenSockets = ImmutableList.of();
NettyServer(
SocketAddress address, Class<? extends ServerChannel> channelType,
@ -240,14 +240,7 @@ class NettyServer implements InternalServer, WithLogId {
}
});
// Bind and start to accept incoming connections.
ChannelFuture future = b.bind(address).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
Instrumented<SocketStats> listenSocket = new ListenSocket(f.channel());
listenSockets = ImmutableList.of(listenSocket);
channelz.addListenSocket(listenSocket);
}
});
ChannelFuture future = b.bind(address);
try {
future.await();
} catch (InterruptedException ex) {
@ -258,6 +251,19 @@ class NettyServer implements InternalServer, WithLogId {
throw new IOException("Failed to bind", future.cause());
}
channel = future.channel();
Future<?> channelzFuture = channel.eventLoop().submit(new Runnable() {
@Override
public void run() {
Instrumented<SocketStats> listenSocket = new ListenSocket(channel);
listenSockets = ImmutableList.of(listenSocket);
channelz.addListenSocket(listenSocket);
}
});
try {
channelzFuture.await();
} catch (InterruptedException ex) {
throw new RuntimeException("Interrupted while registering listen socket to channelz", ex);
}
}
@Override