diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index cf6acb3448..2698476cc9 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -85,6 +85,7 @@ final class ClientCallImpl extends ClientCall private final CallOptions callOptions; private ClientStream stream; private volatile ScheduledFuture deadlineCancellationFuture; + private volatile boolean deadlineCancellationFutureShouldBeCancelled; private boolean cancelCalled; private boolean halfCloseCalled; private final ClientTransportProvider clientTransportProvider; @@ -243,14 +244,23 @@ final class ClientCallImpl extends ClientCall stream.setMessageCompression(true); } + stream.start(new ClientStreamListenerImpl(observer, transportFuture)); + // Delay any sources of cancellation after start(), because most of the transports are broken if + // they receive cancel before start. Issue #1343 has more details + // Start the deadline timer after stream creation because it will close the stream Long timeoutNanos = getRemainingTimeoutNanos(callOptions.getDeadlineNanoTime()); if (timeoutNanos != null) { deadlineCancellationFuture = startDeadlineTimer(timeoutNanos); + if (deadlineCancellationFutureShouldBeCancelled) { + // Race detected! ClientStreamListener.closed may have been called before + // deadlineCancellationFuture was set, thereby preventing the future from being cancelled. + // Go ahead and cancel again, just to be sure it was cancelled. + deadlineCancellationFuture.cancel(false); + } } // Propagate later Context cancellation to the remote side. this.context.addListener(this, MoreExecutors.directExecutor()); - stream.start(new ClientStreamListenerImpl(observer, transportFuture)); } /** @@ -445,6 +455,7 @@ final class ClientCallImpl extends ClientCall public final void runInContext() { try { closed = true; + deadlineCancellationFutureShouldBeCancelled = true; // manually optimize the volatile read ScheduledFuture future = deadlineCancellationFuture; if (future != null) { diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractTransportTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractTransportTest.java index 43d08c57a3..14d914f067 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractTransportTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractTransportTest.java @@ -658,6 +658,28 @@ public abstract class AbstractTransportTest { assertEquals(Status.DEADLINE_EXCEEDED, Status.fromThrowable(recorder.getError())); } + @Test(timeout = 10000) + public void deadlineInPast() throws Exception { + // Test once with idle channel and once with active channel + try { + TestServiceGrpc.newBlockingStub(channel) + .withDeadlineAfter(-10, TimeUnit.SECONDS) + .emptyCall(Empty.getDefaultInstance()); + } catch (StatusRuntimeException ex) { + assertEquals(Status.Code.DEADLINE_EXCEEDED, ex.getStatus().getCode()); + } + + // warm up the channel + blockingStub.emptyCall(Empty.getDefaultInstance()); + try { + TestServiceGrpc.newBlockingStub(channel) + .withDeadlineAfter(-10, TimeUnit.SECONDS) + .emptyCall(Empty.getDefaultInstance()); + } catch (StatusRuntimeException ex) { + assertEquals(Status.Code.DEADLINE_EXCEEDED, ex.getStatus().getCode()); + } + } + protected int unaryPayloadLength() { // 10MiB. return 10485760;