Avoid cancel before start, which is not handled in transports

When triggered, it caused the ClientCall.Listener never to complete.
Fixes #1343

The new test doesn't actually fail on my machine with the old code, but
we would hope it would be flaky. Since a race is involved, I don't
expect a more reliable test.
This commit is contained in:
Eric Anderson 2016-01-25 11:08:14 -08:00
parent 15d86d9c42
commit fe5e624153
2 changed files with 34 additions and 1 deletions

View File

@ -85,6 +85,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
private final CallOptions callOptions; private final CallOptions callOptions;
private ClientStream stream; private ClientStream stream;
private volatile ScheduledFuture<?> deadlineCancellationFuture; private volatile ScheduledFuture<?> deadlineCancellationFuture;
private volatile boolean deadlineCancellationFutureShouldBeCancelled;
private boolean cancelCalled; private boolean cancelCalled;
private boolean halfCloseCalled; private boolean halfCloseCalled;
private final ClientTransportProvider clientTransportProvider; private final ClientTransportProvider clientTransportProvider;
@ -243,14 +244,23 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
stream.setMessageCompression(true); stream.setMessageCompression(true);
} }
stream.start(new ClientStreamListenerImpl(observer, transportFuture));
// Delay any sources of cancellation after start(), because most of the transports are broken if
// they receive cancel before start. Issue #1343 has more details
// Start the deadline timer after stream creation because it will close the stream // Start the deadline timer after stream creation because it will close the stream
Long timeoutNanos = getRemainingTimeoutNanos(callOptions.getDeadlineNanoTime()); Long timeoutNanos = getRemainingTimeoutNanos(callOptions.getDeadlineNanoTime());
if (timeoutNanos != null) { if (timeoutNanos != null) {
deadlineCancellationFuture = startDeadlineTimer(timeoutNanos); deadlineCancellationFuture = startDeadlineTimer(timeoutNanos);
if (deadlineCancellationFutureShouldBeCancelled) {
// Race detected! ClientStreamListener.closed may have been called before
// deadlineCancellationFuture was set, thereby preventing the future from being cancelled.
// Go ahead and cancel again, just to be sure it was cancelled.
deadlineCancellationFuture.cancel(false);
}
} }
// Propagate later Context cancellation to the remote side. // Propagate later Context cancellation to the remote side.
this.context.addListener(this, MoreExecutors.directExecutor()); this.context.addListener(this, MoreExecutors.directExecutor());
stream.start(new ClientStreamListenerImpl(observer, transportFuture));
} }
/** /**
@ -445,6 +455,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
public final void runInContext() { public final void runInContext() {
try { try {
closed = true; closed = true;
deadlineCancellationFutureShouldBeCancelled = true;
// manually optimize the volatile read // manually optimize the volatile read
ScheduledFuture<?> future = deadlineCancellationFuture; ScheduledFuture<?> future = deadlineCancellationFuture;
if (future != null) { if (future != null) {

View File

@ -658,6 +658,28 @@ public abstract class AbstractTransportTest {
assertEquals(Status.DEADLINE_EXCEEDED, Status.fromThrowable(recorder.getError())); assertEquals(Status.DEADLINE_EXCEEDED, Status.fromThrowable(recorder.getError()));
} }
@Test(timeout = 10000)
public void deadlineInPast() throws Exception {
// Test once with idle channel and once with active channel
try {
TestServiceGrpc.newBlockingStub(channel)
.withDeadlineAfter(-10, TimeUnit.SECONDS)
.emptyCall(Empty.getDefaultInstance());
} catch (StatusRuntimeException ex) {
assertEquals(Status.Code.DEADLINE_EXCEEDED, ex.getStatus().getCode());
}
// warm up the channel
blockingStub.emptyCall(Empty.getDefaultInstance());
try {
TestServiceGrpc.newBlockingStub(channel)
.withDeadlineAfter(-10, TimeUnit.SECONDS)
.emptyCall(Empty.getDefaultInstance());
} catch (StatusRuntimeException ex) {
assertEquals(Status.Code.DEADLINE_EXCEEDED, ex.getStatus().getCode());
}
}
protected int unaryPayloadLength() { protected int unaryPayloadLength() {
// 10MiB. // 10MiB.
return 10485760; return 10485760;