diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index 13acf79a87..372a16dff6 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -68,13 +68,19 @@ final class ClientCallImpl extends ClientCall { private static final Logger log = Logger.getLogger(ClientCallImpl.class.getName()); private static final byte[] FULL_STREAM_DECOMPRESSION_ENCODINGS = "gzip".getBytes(Charset.forName("US-ASCII")); + // When a deadline is exceeded, there is a race between the server receiving the cancellation from + // the client and the server cancelling the stream itself. If the client's cancellation is + // received first, then the stream's status will be CANCELLED instead of DEADLINE_EXCEEDED. + // This prevents server monitoring from noticing high rate of DEADLINE_EXCEEDED, a common + // monitoring metric (b/118879795). Mitigate this by delayed sending of the client's cancellation. + @VisibleForTesting + static final long DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS = TimeUnit.SECONDS.toNanos(1); private final MethodDescriptor method; private final Tag tag; private final Executor callExecutor; private final CallTracer channelCallsTracer; private final Context context; - private volatile ScheduledFuture deadlineCancellationFuture; private final boolean unaryRequest; private final CallOptions callOptions; private final boolean retryEnabled; @@ -83,11 +89,14 @@ final class ClientCallImpl extends ClientCall { private boolean cancelCalled; private boolean halfCloseCalled; private final ClientTransportProvider clientTransportProvider; - private final CancellationListener cancellationListener = new ContextCancellationListener(); + private ContextCancellationListener cancellationListener; private final ScheduledExecutorService deadlineCancellationExecutor; private boolean fullStreamDecompression; private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance(); private CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance(); + private volatile ScheduledFuture deadlineCancellationNotifyApplicationFuture; + private volatile ScheduledFuture deadlineCancellationSendToServerFuture; + private boolean observerClosed = false; ClientCallImpl( MethodDescriptor method, Executor executor, CallOptions callOptions, @@ -117,9 +126,20 @@ final class ClientCallImpl extends ClientCall { } private final class ContextCancellationListener implements CancellationListener { + private Listener observer; + + private ContextCancellationListener(Listener observer) { + this.observer = observer; + } + @Override public void cancelled(Context context) { - stream.cancel(statusFromCancelled(context)); + if (context.getDeadline() == null || !context.getDeadline().isExpired()) { + stream.cancel(statusFromCancelled(context)); + } else { + Status status = statusFromCancelled(context); + delayedCancelOnDeadlineExceeded(status, observer); + } } } @@ -203,18 +223,7 @@ final class ClientCallImpl extends ClientCall { // Context is already cancelled so no need to create a real stream, just notify the observer // of cancellation via callback on the executor stream = NoopClientStream.INSTANCE; - class ClosedByContext extends ContextRunnable { - ClosedByContext() { - super(context); - } - - @Override - public void runInContext() { - closeObserver(observer, statusFromCancelled(context), new Metadata()); - } - } - - callExecutor.execute(new ClosedByContext()); + executeCloseObserverInContext(observer, statusFromCancelled(context)); return; } final String compressorName = callOptions.getCompressor(); @@ -223,22 +232,9 @@ final class ClientCallImpl extends ClientCall { compressor = compressorRegistry.lookupCompressor(compressorName); if (compressor == null) { stream = NoopClientStream.INSTANCE; - class ClosedByNotFoundCompressor extends ContextRunnable { - ClosedByNotFoundCompressor() { - super(context); - } - - @Override - public void runInContext() { - closeObserver( - observer, - Status.INTERNAL.withDescription( - String.format("Unable to find compressor by name %s", compressorName)), - new Metadata()); - } - } - - callExecutor.execute(new ClosedByNotFoundCompressor()); + Status status = Status.INTERNAL.withDescription( + String.format("Unable to find compressor by name %s", compressorName)); + executeCloseObserverInContext(observer, status); return; } } else { @@ -287,6 +283,7 @@ final class ClientCallImpl extends ClientCall { } stream.setDecompressorRegistry(decompressorRegistry); channelCallsTracer.reportCallStarted(); + cancellationListener = new ContextCancellationListener(observer); stream.start(new ClientStreamListenerImpl(observer)); // Delay any sources of cancellation after start(), because most of the transports are broken if @@ -298,8 +295,11 @@ final class ClientCallImpl extends ClientCall { // If the context has the effective deadline, we don't need to schedule an extra task. && !effectiveDeadline.equals(context.getDeadline()) // If the channel has been terminated, we don't need to schedule an extra task. - && deadlineCancellationExecutor != null) { - deadlineCancellationFuture = startDeadlineTimer(effectiveDeadline); + && deadlineCancellationExecutor != null + // if already expired deadline let failing stream handle + && !(stream instanceof FailingClientStream)) { + deadlineCancellationNotifyApplicationFuture = + startDeadlineNotifyApplicationTimer(effectiveDeadline, observer); } if (cancelListenersShouldBeRemoved) { // Race detected! ClientStreamListener.closed may have been called before @@ -333,46 +333,98 @@ final class ClientCallImpl extends ClientCall { private void removeContextListenerAndCancelDeadlineFuture() { context.removeListener(cancellationListener); - ScheduledFuture f = deadlineCancellationFuture; + ScheduledFuture f = deadlineCancellationSendToServerFuture; + if (f != null) { + f.cancel(false); + } + + f = deadlineCancellationNotifyApplicationFuture; if (f != null) { f.cancel(false); } } - private class DeadlineTimer implements Runnable { - private final long remainingNanos; + private ScheduledFuture startDeadlineNotifyApplicationTimer(Deadline deadline, + final Listener observer) { + final long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS); - DeadlineTimer(long remainingNanos) { - this.remainingNanos = remainingNanos; - } - - @Override - public void run() { - InsightBuilder insight = new InsightBuilder(); - stream.appendTimeoutInsight(insight); - // DelayedStream.cancel() is safe to call from a thread that is different from where the - // stream is created. - long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1); - long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1); - - StringBuilder buf = new StringBuilder(); - buf.append("deadline exceeded after "); - if (remainingNanos < 0) { - buf.append('-'); + class DeadlineExceededNotifyApplicationTimer implements Runnable { + @Override + public void run() { + Status status = buildDeadlineExceededStatusWithRemainingNanos(remainingNanos); + delayedCancelOnDeadlineExceeded(status, observer); } - buf.append(seconds); - buf.append(String.format(".%09d", nanos)); - buf.append("s. "); - buf.append(insight); - stream.cancel(DEADLINE_EXCEEDED.augmentDescription(buf.toString())); } + + return deadlineCancellationExecutor.schedule( + new LogExceptionRunnable(new DeadlineExceededNotifyApplicationTimer()), + remainingNanos, + TimeUnit.NANOSECONDS); } - private ScheduledFuture startDeadlineTimer(Deadline deadline) { - long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS); - return deadlineCancellationExecutor.schedule( - new LogExceptionRunnable( - new DeadlineTimer(remainingNanos)), remainingNanos, TimeUnit.NANOSECONDS); + private Status buildDeadlineExceededStatusWithRemainingNanos(long remainingNanos) { + final InsightBuilder insight = new InsightBuilder(); + stream.appendTimeoutInsight(insight); + + long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1); + long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1); + + StringBuilder buf = new StringBuilder(); + buf.append("deadline exceeded after "); + if (remainingNanos < 0) { + buf.append('-'); + } + buf.append(seconds); + buf.append(String.format(".%09d", nanos)); + buf.append("s. "); + buf.append(insight); + + return DEADLINE_EXCEEDED.augmentDescription(buf.toString()); + } + + private void delayedCancelOnDeadlineExceeded(final Status status, Listener observer) { + if (deadlineCancellationSendToServerFuture != null) { + return; + } + + class DeadlineExceededSendCancelToServerTimer implements Runnable { + @Override + public void run() { + // DelayedStream.cancel() is safe to call from a thread that is different from where the + // stream is created. + stream.cancel(status); + } + } + + // This races with removeContextListenerAndCancelDeadlineFuture(). Since calling cancel() on a + // stream multiple time is safe, the race here is fine. + deadlineCancellationSendToServerFuture = deadlineCancellationExecutor.schedule( + new LogExceptionRunnable(new DeadlineExceededSendCancelToServerTimer()), + DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS, + TimeUnit.NANOSECONDS); + executeCloseObserverInContext(observer, status); + } + + private void executeCloseObserverInContext(final Listener observer, final Status status) { + class CloseInContext extends ContextRunnable { + CloseInContext() { + super(context); + } + + @Override + public void runInContext() { + closeObserver(observer, status, new Metadata()); + } + } + + callExecutor.execute(new CloseInContext()); + } + + private void closeObserver(Listener observer, Status status, Metadata trailers) { + if (!observerClosed) { + observerClosed = true; + observer.onClose(status, trailers); + } } @Nullable @@ -517,10 +569,6 @@ final class ClientCallImpl extends ClientCall { return Attributes.EMPTY; } - private void closeObserver(Listener observer, Status status, Metadata trailers) { - observer.onClose(status, trailers); - } - @Override public String toString() { return MoreObjects.toStringHelper(this).add("method", method).toString(); diff --git a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java index ffd511dd62..091de2cc26 100644 --- a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java @@ -17,6 +17,7 @@ package io.grpc.internal; import static com.google.common.truth.Truth.assertThat; +import static io.grpc.internal.ClientCallImpl.DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS; import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITTER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -54,6 +55,7 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; import io.grpc.Status; +import io.grpc.Status.Code; import io.grpc.internal.ClientCallImpl.ClientTransportProvider; import io.grpc.internal.testing.SingleMessageProducer; import io.grpc.testing.TestMethodDescriptors; @@ -131,6 +133,9 @@ public class ClientCallImplTest { @Captor private ArgumentCaptor statusArgumentCaptor; + @Captor + private ArgumentCaptor metadataArgumentCaptor; + private CallOptions baseCallOptions; @Before @@ -165,7 +170,7 @@ public class ClientCallImplTest { provider, deadlineCancellationExecutor, channelCallTracer, - false /* retryEnabled */); + /* retryEnabled= */ false); call.start(callListener, new Metadata()); verify(stream).start(listenerArgumentCaptor.capture()); final ClientStreamListener streamListener = listenerArgumentCaptor.getValue(); @@ -187,7 +192,7 @@ public class ClientCallImplTest { provider, deadlineCancellationExecutor, channelCallTracer, - false /* retryEnabled */); + /* retryEnabled= */ false); call.start(callListener, new Metadata()); verify(stream).start(listenerArgumentCaptor.capture()); final ClientStreamListener streamListener = listenerArgumentCaptor.getValue(); @@ -225,7 +230,7 @@ public class ClientCallImplTest { provider, deadlineCancellationExecutor, channelCallTracer, - false /* retryEnabled */); + /* retryEnabled= */ false); call.start(callListener, new Metadata()); verify(stream).start(listenerArgumentCaptor.capture()); final ClientStreamListener streamListener = listenerArgumentCaptor.getValue(); @@ -261,7 +266,7 @@ public class ClientCallImplTest { provider, deadlineCancellationExecutor, channelCallTracer, - false /* retryEnabled */); + /* retryEnabled= */ false); call.start(callListener, new Metadata()); verify(stream).start(listenerArgumentCaptor.capture()); final ClientStreamListener streamListener = listenerArgumentCaptor.getValue(); @@ -296,7 +301,7 @@ public class ClientCallImplTest { provider, deadlineCancellationExecutor, channelCallTracer, - false /* retryEnabled */) + /* retryEnabled= */ false) .setDecompressorRegistry(decompressorRegistry); call.start(callListener, new Metadata()); @@ -320,7 +325,7 @@ public class ClientCallImplTest { provider, deadlineCancellationExecutor, channelCallTracer, - false /* retryEnabled */) + /* retryEnabled= */ false) .setDecompressorRegistry(decompressorRegistry); call.start(callListener, new Metadata()); @@ -337,7 +342,7 @@ public class ClientCallImplTest { provider, deadlineCancellationExecutor, channelCallTracer, - false /* retryEnabled */) + /* retryEnabled= */ false) .setDecompressorRegistry(decompressorRegistry); final Metadata metadata = new Metadata(); @@ -356,7 +361,7 @@ public class ClientCallImplTest { provider, deadlineCancellationExecutor, channelCallTracer, - false /* retryEnabled */) + /* retryEnabled= */ false) .setDecompressorRegistry(decompressorRegistry); call.start(callListener, new Metadata()); @@ -527,7 +532,7 @@ public class ClientCallImplTest { provider, deadlineCancellationExecutor, channelCallTracer, - false /* retryEnabled */) + /* retryEnabled= */ false) .setDecompressorRegistry(decompressorRegistry); context.detach(previous); @@ -605,7 +610,7 @@ public class ClientCallImplTest { provider, deadlineCancellationExecutor, channelCallTracer, - false /* retryEnabled */) + /* retryEnabled= */ false) .setDecompressorRegistry(decompressorRegistry); cancellableContext.detach(previous); @@ -635,7 +640,7 @@ public class ClientCallImplTest { provider, deadlineCancellationExecutor, channelCallTracer, - false /* retryEnabled */) + /* retryEnabled= */ false) .setDecompressorRegistry(decompressorRegistry); cancellableContext.detach(previous); @@ -680,7 +685,7 @@ public class ClientCallImplTest { provider, deadlineCancellationExecutor, channelCallTracer, - false /* retryEnabled */) + /* retryEnabled= */ false) .setDecompressorRegistry(decompressorRegistry); call.start(callListener, new Metadata()); verify(transport, times(0)) @@ -705,7 +710,7 @@ public class ClientCallImplTest { provider, deadlineCancellationExecutor, channelCallTracer, - false /* retryEnabled */); + /* retryEnabled= */ false); call.start(callListener, new Metadata()); context.detach(origContext); @@ -730,7 +735,7 @@ public class ClientCallImplTest { provider, deadlineCancellationExecutor, channelCallTracer, - false /* retryEnabled */); + /* retryEnabled= */ false); call.start(callListener, new Metadata()); context.detach(origContext); @@ -755,7 +760,7 @@ public class ClientCallImplTest { provider, deadlineCancellationExecutor, channelCallTracer, - false /* retryEnabled */); + /* retryEnabled= */ false); call.start(callListener, new Metadata()); context.detach(origContext); @@ -776,7 +781,7 @@ public class ClientCallImplTest { provider, deadlineCancellationExecutor, channelCallTracer, - false /* retryEnabled */); + /* retryEnabled= */ false); call.start(callListener, new Metadata()); ArgumentCaptor deadlineCaptor = ArgumentCaptor.forClass(Deadline.class); @@ -794,7 +799,7 @@ public class ClientCallImplTest { provider, deadlineCancellationExecutor, channelCallTracer, - false /* retryEnabled */); + /* retryEnabled= */ false); call.start(callListener, new Metadata()); verify(stream, never()).setDeadline(any(Deadline.class)); @@ -808,17 +813,30 @@ public class ClientCallImplTest { ClientCallImpl call = new ClientCallImpl<>( method, MoreExecutors.directExecutor(), - baseCallOptions.withDeadline(Deadline.after(1, TimeUnit.SECONDS)), + baseCallOptions.withDeadline( + Deadline.after(1, TimeUnit.SECONDS, fakeClock.getDeadlineTicker())), provider, deadlineCancellationExecutor, channelCallTracer, - false /* retryEnabled */); + /* retryEnabled= */ false); call.start(callListener, new Metadata()); - fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1) + 1); + fakeClock.forwardTime(1000, TimeUnit.MILLISECONDS); - verify(stream, times(1)).cancel(statusCaptor.capture()); + // Verify cancel sent to application when deadline just past + verify(callListener).onClose(statusCaptor.capture(), metadataArgumentCaptor.capture()); + assertThat(statusCaptor.getValue().getDescription()) + .matches("deadline exceeded after [0-9]+\\.[0-9]+s. \\[remote_addr=127\\.0\\.0\\.1:443\\]"); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED); + verify(stream, never()).cancel(statusCaptor.capture()); + + fakeClock.forwardNanos(DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS - 1); + verify(stream, never()).cancel(any(Status.class)); + + // verify cancel send to server is delayed with DEADLINE_EXPIRATION_CANCEL_DELAY + fakeClock.forwardNanos(1); + verify(stream).cancel(statusCaptor.capture()); assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode()); assertThat(statusCaptor.getValue().getDescription()) .matches("deadline exceeded after [0-9]+\\.[0-9]+s. \\[remote_addr=127\\.0\\.0\\.1:443\\]"); @@ -828,8 +846,8 @@ public class ClientCallImplTest { public void expiredDeadlineCancelsStream_Context() { fakeClock.forwardTime(System.nanoTime(), TimeUnit.NANOSECONDS); - Context context = Context.current() - .withDeadlineAfter(1, TimeUnit.SECONDS, deadlineCancellationExecutor); + Deadline deadline = Deadline.after(1, TimeUnit.SECONDS, fakeClock.getDeadlineTicker()); + Context context = Context.current().withDeadline(deadline, deadlineCancellationExecutor); Context origContext = context.attach(); ClientCallImpl call = new ClientCallImpl<>( @@ -839,15 +857,22 @@ public class ClientCallImplTest { provider, deadlineCancellationExecutor, channelCallTracer, - false /* retryEnabled */); + /* retryEnabled= */ false); context.detach(origContext); call.start(callListener, new Metadata()); - fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1) + 1); + fakeClock.forwardTime(1000, TimeUnit.MILLISECONDS); + verify(stream, never()).cancel(statusCaptor.capture()); + // verify app is notified. + verify(callListener).onClose(statusCaptor.capture(), metadataArgumentCaptor.capture()); + assertThat(statusCaptor.getValue().getDescription()).contains("context timed out"); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED); - verify(stream, times(1)).cancel(statusCaptor.capture()); + // verify cancel send to server is delayed with DEADLINE_EXPIRATION_CANCEL_DELAY + fakeClock.forwardNanos(DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS); + verify(stream).cancel(statusCaptor.capture()); assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode()); assertThat(statusCaptor.getValue().getDescription()).isEqualTo("context timed out"); } @@ -863,7 +888,7 @@ public class ClientCallImplTest { provider, deadlineCancellationExecutor, channelCallTracer, - false /* retryEnabled */); + /* retryEnabled= */ false); call.start(callListener, new Metadata()); call.cancel("canceled", null); @@ -888,7 +913,7 @@ public class ClientCallImplTest { provider, deadlineCancellationExecutor, channelCallTracer, - false /* retryEnabled */); + /* retryEnabled= */ false); Metadata headers = new Metadata(); @@ -906,7 +931,7 @@ public class ClientCallImplTest { provider, deadlineCancellationExecutor, channelCallTracer, - false /* retryEnabled */); + /* retryEnabled= */ false); final Exception cause = new Exception(); ClientCall.Listener callListener = new ClientCall.Listener() { @@ -944,7 +969,7 @@ public class ClientCallImplTest { provider, deadlineCancellationExecutor, channelCallTracer, - false /* retryEnabled */) + /* retryEnabled= */ false) .setDecompressorRegistry(decompressorRegistry); call.start(callListener, new Metadata()); @@ -957,7 +982,7 @@ public class ClientCallImplTest { public void getAttributes() { ClientCallImpl call = new ClientCallImpl<>( method, MoreExecutors.directExecutor(), baseCallOptions, provider, - deadlineCancellationExecutor, channelCallTracer, false /* retryEnabled */); + deadlineCancellationExecutor, channelCallTracer, /* retryEnabled= */ false); Attributes attrs = Attributes.newBuilder().set(Key.create("fake key"), "fake value").build(); when(stream.getAttributes()).thenReturn(attrs); diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 8dc4a51419..a278ae5802 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -3477,7 +3477,7 @@ public class ManagedChannelImplTest { CallOptions.DEFAULT.withDeadlineAfter(5, TimeUnit.SECONDS)); ListenableFuture future2 = ClientCalls.futureUnaryCall(call2, null); - timer.forwardTime(1234, TimeUnit.SECONDS); + timer.forwardTime(5, TimeUnit.SECONDS); executor.runDueTasks(); try { @@ -3488,6 +3488,9 @@ public class ManagedChannelImplTest { } mychannel.shutdownNow(); + // Now for Deadline_exceeded, stream shutdown is delayed, calling shutdownNow() on a open stream + // will add a task to executor. Cleaning that task here. + executor.runDueTasks(); } @Deprecated