netty: handle racy stream cancellation on already failed transports

This commit is contained in:
Carl Mastrangelo 2019-01-22 15:15:35 -08:00 committed by GitHub
parent f0fc57878d
commit 985bf0aa74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 49 additions and 4 deletions

View File

@ -502,6 +502,7 @@ class NettyClientHandler extends AbstractNettyHandler {
private void createStream(final CreateStreamCommand command, final ChannelPromise promise)
throws Exception {
if (lifecycleManager.getShutdownThrowable() != null) {
command.stream().setNonExistent();
// The connection is going away (it is really the GOAWAY case),
// just terminate the stream now.
command.stream().transportReportStatus(
@ -515,6 +516,7 @@ class NettyClientHandler extends AbstractNettyHandler {
try {
streamId = incrementAndGetNextStreamId();
} catch (StatusException e) {
command.stream().setNonExistent();
// Stream IDs have been exhausted for this connection. Fail the promise immediately.
promise.setFailure(e);
@ -588,7 +590,11 @@ class NettyClientHandler extends AbstractNettyHandler {
if (reason != null) {
stream.transportReportStatus(reason, true, new Metadata());
}
encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise);
if (!cmd.stream().isNonExistent()) {
encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise);
} else {
promise.setSuccess();
}
}
/**

View File

@ -212,6 +212,8 @@ class NettyClientStream extends AbstractClientStream {
/** This should only called from the transport thread. */
public abstract static class TransportState extends Http2ClientStreamTransportState
implements StreamIdHolder {
private static final int NON_EXISTENT_ID = -1;
private final NettyClientHandler handler;
private final EventLoop eventLoop;
private int id;
@ -230,14 +232,29 @@ class NettyClientStream extends AbstractClientStream {
@Override
public int id() {
// id should be positive
return id;
}
public void setId(int id) {
checkArgument(id > 0, "id must be positive");
checkArgument(id > 0, "id must be positive %s", id);
checkState(this.id == 0, "id has been previously set: %s", this.id);
this.id = id;
}
/**
* Marks the stream state as if it had never existed. This can happen if the stream is
* cancelled after it is created, but before it has been started.
*/
void setNonExistent() {
checkState(this.id == 0, "Id has been previously set: %s", this.id);
this.id = NON_EXISTENT_ID;
}
boolean isNonExistent() {
return this.id == NON_EXISTENT_ID;
}
/**
* Sets the underlying Netty {@link Http2Stream} for this stream. This must be called in the
* context of the transport thread.

View File

@ -17,6 +17,7 @@
package io.grpc.netty;
import static com.google.common.base.Charsets.UTF_8;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED;
import static io.grpc.internal.ClientStreamListener.RpcProgress.REFUSED;
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
@ -542,8 +543,14 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
ChannelFuture future = createStream();
assertTrue(future.isSuccess());
TransportStateImpl newStreamTransportState = new TransportStateImpl(
handler(),
channel().eventLoop(),
DEFAULT_MAX_MESSAGE_SIZE,
transportTracer);
// This should fail - out of stream IDs.
future = createStream();
future = enqueue(newCreateStreamCommand(grpcHeaders, newStreamTransportState));
assertTrue(future.isDone());
assertFalse(future.isSuccess());
Status status = lifecycleManager.getShutdownStatus();
@ -552,6 +559,22 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
status.getDescription().contains("exhausted"));
}
@Test
public void nonExistentStream() throws Exception {
Status status = Status.INTERNAL.withDescription("zz");
lifecycleManager.notifyShutdown(status);
// Stream creation can race with the transport shutting down, with the create command already
// enqueued.
ChannelFuture future1 = createStream();
future1.await();
assertNotNull(future1.cause());
assertThat(Status.fromThrowable(future1.cause()).getCode()).isEqualTo(status.getCode());
ChannelFuture future2 = enqueue(new CancelClientStreamCommand(streamTransportState, status));
future2.sync();
}
@Test
public void ping() throws Exception {
PingCallbackImpl callback1 = new PingCallbackImpl();

View File

@ -515,7 +515,6 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
transportTracer,
CallOptions.DEFAULT);
stream.start(listener);
stream.transportState().setId(STREAM_ID);
stream.transportState().setHttp2Stream(http2Stream);
reset(listener);
return stream;