mirror of https://github.com/grpc/grpc-java.git
netty: Reduce race window size between GOAWAY and new streams
The race between new streams and transport shutdown is #2562, but it is still far from being generally solved. This reduces the race window of new streams from (transport selection → stream created on network thread) to (transport selection → stream enqueued on network thread). Since only a single thread now needs to do work in the stream creation race window, the window should be dramatically smaller. This only reduces GOAWAY races when the server performs a graceful shutdown (using two GOAWAYs), as that is the only non-racy way on-the-wire to shutdown a connection in HTTP/2.
This commit is contained in:
parent
4974b51c53
commit
9ead606b84
|
|
@ -43,14 +43,25 @@ final class ClientTransportLifecycleManager {
|
||||||
listener.transportReady();
|
listener.transportReady();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void notifyShutdown(Status s) {
|
/**
|
||||||
|
* Marks transport as shutdown, but does not set the error status. This must eventually be
|
||||||
|
* followed by a call to notifyShutdown.
|
||||||
|
*/
|
||||||
|
public void notifyGracefulShutdown(Status s) {
|
||||||
if (transportShutdown) {
|
if (transportShutdown) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
transportShutdown = true;
|
transportShutdown = true;
|
||||||
|
listener.transportShutdown(s);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void notifyShutdown(Status s) {
|
||||||
|
notifyGracefulShutdown(s);
|
||||||
|
if (shutdownStatus != null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
shutdownStatus = s;
|
shutdownStatus = s;
|
||||||
shutdownThrowable = s.asException();
|
shutdownThrowable = s.asException();
|
||||||
listener.transportShutdown(s);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void notifyInUse(boolean inUse) {
|
public void notifyInUse(boolean inUse) {
|
||||||
|
|
|
||||||
|
|
@ -755,10 +755,21 @@ class NettyClientHandler extends AbstractNettyHandler {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handler for a GOAWAY being received. Fails any streams created after the
|
* Handler for a GOAWAY being received. Fails any streams created after the
|
||||||
* last known stream.
|
* last known stream. May only be called during a read.
|
||||||
*/
|
*/
|
||||||
private void goingAway(Status status) {
|
private void goingAway(Status status) {
|
||||||
|
lifecycleManager.notifyGracefulShutdown(status);
|
||||||
|
// Try to allocate as many in-flight streams as possible, to reduce race window of
|
||||||
|
// https://github.com/grpc/grpc-java/issues/2562 . To be of any help, the server has to
|
||||||
|
// gracefully shut down the connection with two GOAWAYs. gRPC servers generally send a PING
|
||||||
|
// after the first GOAWAY, so they can very precisely detect when the GOAWAY has been
|
||||||
|
// processed and thus this processing must be in-line before processing additional reads.
|
||||||
|
|
||||||
|
// This can cause reentrancy, but should be minor since it is normal to handle writes in
|
||||||
|
// response to a read. Also, the call stack is rather shallow at this point
|
||||||
|
clientWriteQueue.drainNow();
|
||||||
lifecycleManager.notifyShutdown(status);
|
lifecycleManager.notifyShutdown(status);
|
||||||
|
|
||||||
final Status goAwayStatus = lifecycleManager.getShutdownStatus();
|
final Status goAwayStatus = lifecycleManager.getShutdownStatus();
|
||||||
final int lastKnownStream = connection().local().lastStreamKnownByPeer();
|
final int lastKnownStream = connection().local().lastStreamKnownByPeer();
|
||||||
try {
|
try {
|
||||||
|
|
|
||||||
|
|
@ -101,6 +101,19 @@ class WriteQueue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Executes enqueued work directly on the current thread. This can be used to trigger writes
|
||||||
|
* before performing additional reads. Must be called from the event loop. This method makes no
|
||||||
|
* guarantee that the work queue is empty when it returns.
|
||||||
|
*/
|
||||||
|
void drainNow() {
|
||||||
|
Preconditions.checkState(channel.eventLoop().inEventLoop(), "must be on the event loop");
|
||||||
|
if (queue.peek() == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
flush();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process the queue of commands and dispatch them to the stream. This method is only
|
* Process the queue of commands and dispatch them to the stream. This method is only
|
||||||
* called in the event loop
|
* called in the event loop
|
||||||
|
|
|
||||||
|
|
@ -361,6 +361,19 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
|
||||||
assertTrue(future.isDone());
|
assertTrue(future.isDone());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void receivedGoAwayShouldNotAffectRacingQueuedStreamId() throws Exception {
|
||||||
|
// This command has not actually been executed yet
|
||||||
|
ChannelFuture future = writeQueue().enqueue(
|
||||||
|
newCreateStreamCommand(grpcHeaders, streamTransportState), true);
|
||||||
|
channelRead(goAwayFrame(streamId));
|
||||||
|
verify(streamListener, never())
|
||||||
|
.closed(any(Status.class), any(Metadata.class));
|
||||||
|
verify(streamListener, never())
|
||||||
|
.closed(any(Status.class), any(RpcProgress.class), any(Metadata.class));
|
||||||
|
assertTrue(future.isDone());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void receivedResetWithRefuseCode() throws Exception {
|
public void receivedResetWithRefuseCode() throws Exception {
|
||||||
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
|
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
|
||||||
|
|
|
||||||
|
|
@ -203,6 +203,10 @@ public abstract class NettyHandlerTestBase<T extends Http2ConnectionHandler> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected final WriteQueue writeQueue() {
|
||||||
|
return writeQueue;
|
||||||
|
}
|
||||||
|
|
||||||
protected final T handler() {
|
protected final T handler() {
|
||||||
return handler;
|
return handler;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue