diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index e2176668b7..3ae1e0a1ef 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -28,7 +28,6 @@ import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY; import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY; import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY; import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY; -import static java.lang.Math.max; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; @@ -62,6 +61,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -82,16 +82,13 @@ final class ClientCallImpl extends ClientCall { private final boolean callExecutorIsDirect; private final CallTracer channelCallsTracer; private final Context context; - private volatile ScheduledFuture deadlineCancellationFuture; + private CancellationHandler cancellationHandler; private final boolean unaryRequest; private CallOptions callOptions; private ClientStream stream; - private volatile boolean cancelListenersShouldBeRemoved; private boolean cancelCalled; private boolean halfCloseCalled; private final ClientStreamProvider clientStreamProvider; - private final ContextCancellationListener cancellationListener = - new ContextCancellationListener(); private final ScheduledExecutorService deadlineCancellationExecutor; private boolean fullStreamDecompression; private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance(); @@ -128,13 +125,6 @@ final class ClientCallImpl extends ClientCall { PerfMark.event("ClientCall.", tag); } - private final class ContextCancellationListener implements CancellationListener { - @Override - public void cancelled(Context context) { - stream.cancel(statusFromCancelled(context)); - } - } - /** * Provider of {@link ClientStream}s. */ @@ -252,21 +242,21 @@ final class ClientCallImpl extends ClientCall { prepareHeaders(headers, decompressorRegistry, compressor, fullStreamDecompression); Deadline effectiveDeadline = effectiveDeadline(); - boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired(); + boolean contextIsDeadlineSource = effectiveDeadline != null + && effectiveDeadline.equals(context.getDeadline()); + cancellationHandler = new CancellationHandler(effectiveDeadline, contextIsDeadlineSource); + boolean deadlineExceeded = effectiveDeadline != null && cancellationHandler.remainingNanos <= 0; if (!deadlineExceeded) { - logIfContextNarrowedTimeout( - effectiveDeadline, context.getDeadline(), callOptions.getDeadline()); stream = clientStreamProvider.newStream(method, callOptions, headers, context); } else { ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false); - String deadlineName = - isFirstMin(callOptions.getDeadline(), context.getDeadline()) ? "CallOptions" : "Context"; + String deadlineName = contextIsDeadlineSource ? "Context" : "CallOptions"; Long nameResolutionDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED); String description = String.format( "ClientCall started after %s deadline was exceeded %.9f seconds ago. " + "Name resolution delay %.9f seconds.", deadlineName, - effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS) / NANO_TO_SECS, + cancellationHandler.remainingNanos / NANO_TO_SECS, nameResolutionDelay == null ? 0 : nameResolutionDelay / NANO_TO_SECS); stream = new FailingClientStream(DEADLINE_EXCEEDED.withDescription(description), tracers); } @@ -298,21 +288,7 @@ final class ClientCallImpl extends ClientCall { // they receive cancel before start. Issue #1343 has more details // Propagate later Context cancellation to the remote side. - context.addListener(cancellationListener, directExecutor()); - if (effectiveDeadline != null - // If the context has the effective deadline, we don't need to schedule an extra task. - && !effectiveDeadline.equals(context.getDeadline()) - // If the channel has been terminated, we don't need to schedule an extra task. - && deadlineCancellationExecutor != null) { - deadlineCancellationFuture = startDeadlineTimer(effectiveDeadline); - } - if (cancelListenersShouldBeRemoved) { - // Race detected! ClientStreamListener.closed may have been called before - // deadlineCancellationFuture was set / context listener added, thereby preventing the future - // and listener from being cancelled. Go ahead and cancel again, just to be sure it - // was cancelled. - removeContextListenerAndCancelDeadlineFuture(); - } + cancellationHandler.setUp(); } private void applyMethodConfig() { @@ -354,54 +330,77 @@ final class ClientCallImpl extends ClientCall { } } - private static void logIfContextNarrowedTimeout( - Deadline effectiveDeadline, @Nullable Deadline outerCallDeadline, - @Nullable Deadline callDeadline) { - if (!log.isLoggable(Level.FINE) || effectiveDeadline == null - || !effectiveDeadline.equals(outerCallDeadline)) { - return; - } - - long effectiveTimeout = max(0, effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS)); - StringBuilder builder = new StringBuilder(String.format( - Locale.US, - "Call timeout set to '%d' ns, due to context deadline.", effectiveTimeout)); - if (callDeadline == null) { - builder.append(" Explicit call timeout was not set."); - } else { - long callTimeout = callDeadline.timeRemaining(TimeUnit.NANOSECONDS); - builder.append(String.format(Locale.US, " Explicit call timeout was '%d' ns.", callTimeout)); - } - - log.fine(builder.toString()); - } - - private void removeContextListenerAndCancelDeadlineFuture() { - context.removeListener(cancellationListener); - ScheduledFuture f = deadlineCancellationFuture; - if (f != null) { - f.cancel(false); - } - } - - private class DeadlineTimer implements Runnable { + private final class CancellationHandler implements Runnable, CancellationListener { + private final boolean contextIsDeadlineSource; + private final boolean hasDeadline; private final long remainingNanos; + private volatile ScheduledFuture deadlineCancellationFuture; + private volatile boolean tearDownCalled; - DeadlineTimer(long remainingNanos) { - this.remainingNanos = remainingNanos; + CancellationHandler(Deadline deadline, boolean contextIsDeadlineSource) { + this.contextIsDeadlineSource = contextIsDeadlineSource; + if (deadline == null) { + hasDeadline = false; + remainingNanos = 0; + } else { + hasDeadline = true; + remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS); + } + } + + void setUp() { + if (tearDownCalled) { + return; + } + if (hasDeadline + // If the context has the effective deadline, we don't need to schedule an extra task. + && !contextIsDeadlineSource + // If the channel has been terminated, we don't need to schedule an extra task. + && deadlineCancellationExecutor != null) { + deadlineCancellationFuture = deadlineCancellationExecutor.schedule( + new LogExceptionRunnable(this), remainingNanos, TimeUnit.NANOSECONDS); + } + context.addListener(this, directExecutor()); + if (tearDownCalled) { + // Race detected! Re-run to make sure the future is cancelled and context listener removed + tearDown(); + } + } + + // May be called multiple times, and race with setUp() + void tearDown() { + tearDownCalled = true; + ScheduledFuture deadlineCancellationFuture = this.deadlineCancellationFuture; + if (deadlineCancellationFuture != null) { + deadlineCancellationFuture.cancel(false); + } + context.removeListener(this); + } + + @Override + public void cancelled(Context context) { + if (hasDeadline && contextIsDeadlineSource + && context.cancellationCause() instanceof TimeoutException) { + stream.cancel(formatDeadlineExceededStatus()); + return; + } + stream.cancel(statusFromCancelled(context)); } @Override public void run() { - InsightBuilder insight = new InsightBuilder(); - stream.appendTimeoutInsight(insight); + stream.cancel(formatDeadlineExceededStatus()); + } + + Status formatDeadlineExceededStatus() { // DelayedStream.cancel() is safe to call from a thread that is different from where the // stream is created. long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1); long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1); StringBuilder buf = new StringBuilder(); - buf.append("deadline exceeded after "); + buf.append(contextIsDeadlineSource ? "Context" : "CallOptions"); + buf.append(" deadline exceeded after "); if (remainingNanos < 0) { buf.append('-'); } @@ -409,20 +408,18 @@ final class ClientCallImpl extends ClientCall { buf.append(String.format(Locale.US, ".%09d", nanos)); buf.append("s. "); Long nsDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED); - buf.append(String.format(Locale.US, "Name resolution delay %.9f seconds. ", + buf.append(String.format(Locale.US, "Name resolution delay %.9f seconds.", nsDelay == null ? 0 : nsDelay / NANO_TO_SECS)); - buf.append(insight); - stream.cancel(DEADLINE_EXCEEDED.augmentDescription(buf.toString())); + if (stream != null) { + InsightBuilder insight = new InsightBuilder(); + stream.appendTimeoutInsight(insight); + buf.append(" "); + buf.append(insight); + } + return DEADLINE_EXCEEDED.withDescription(buf.toString()); } } - private ScheduledFuture startDeadlineTimer(Deadline deadline) { - long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS); - return deadlineCancellationExecutor.schedule( - new LogExceptionRunnable( - new DeadlineTimer(remainingNanos)), remainingNanos, TimeUnit.NANOSECONDS); - } - @Nullable private Deadline effectiveDeadline() { // Call options and context are immutable, so we don't need to cache the deadline. @@ -440,16 +437,6 @@ final class ClientCallImpl extends ClientCall { return deadline0.minimum(deadline1); } - private static boolean isFirstMin(@Nullable Deadline deadline0, @Nullable Deadline deadline1) { - if (deadline0 == null) { - return false; - } - if (deadline1 == null) { - return true; - } - return deadline0.isBefore(deadline1); - } - @Override public void request(int numMessages) { try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.request")) { @@ -493,7 +480,7 @@ final class ClientCallImpl extends ClientCall { stream.cancel(status); } } finally { - removeContextListenerAndCancelDeadlineFuture(); + cancellationHandler.tearDown(); } } @@ -699,10 +686,7 @@ final class ClientCallImpl extends ClientCall { // description. Since our timer may be delayed in firing, we double-check the deadline and // turn the failure into the likely more helpful DEADLINE_EXCEEDED status. if (deadline.isExpired()) { - InsightBuilder insight = new InsightBuilder(); - stream.appendTimeoutInsight(insight); - status = DEADLINE_EXCEEDED.augmentDescription( - "ClientCall was cancelled at or after deadline. " + insight); + status = cancellationHandler.formatDeadlineExceededStatus(); // Replace trailers to prevent mixing sources of status and trailers. trailers = new Metadata(); } @@ -725,6 +709,7 @@ final class ClientCallImpl extends ClientCall { } private void runInternal() { + cancellationHandler.tearDown(); Status status = savedStatus; Metadata trailers = savedTrailers; if (exceptionStatus != null) { @@ -737,11 +722,9 @@ final class ClientCallImpl extends ClientCall { // Replace trailers to prevent mixing sources of status and trailers. trailers = new Metadata(); } - cancelListenersShouldBeRemoved = true; try { closeObserver(observer, status, trailers); } finally { - removeContextListenerAndCancelDeadlineFuture(); channelCallsTracer.reportCallEnded(status.isOk()); } } diff --git a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java index 34011cd844..17d5e5b46b 100644 --- a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java @@ -926,7 +926,7 @@ public class ClientCallImplTest { verify(stream, times(1)).cancel(statusCaptor.capture()); assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode()); assertThat(statusCaptor.getValue().getDescription()) - .matches("deadline exceeded after [0-9]+\\.[0-9]+s. " + .matches("CallOptions deadline exceeded after [0-9]+\\.[0-9]+s. " + "Name resolution delay 0.000000000 seconds. \\[remote_addr=127\\.0\\.0\\.1:443\\]"); } @@ -954,7 +954,9 @@ public class ClientCallImplTest { verify(stream, times(1)).cancel(statusCaptor.capture()); assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode()); - assertThat(statusCaptor.getValue().getDescription()).isEqualTo("context timed out"); + assertThat(statusCaptor.getValue().getDescription()) + .matches("Context deadline exceeded after [0-9]+\\.[0-9]+s. " + + "Name resolution delay 0.000000000 seconds. \\[remote_addr=127\\.0\\.0\\.1:443\\]"); } @Test diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index d450ece7bc..3efd576abe 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -1190,7 +1190,7 @@ public abstract class AbstractInteropTest { assertTrue(desc, // There is a race between client and server-side deadline expiration. // If client expires first, it'd generate this message - Pattern.matches("deadline exceeded after .*s. \\[.*\\]", desc) + Pattern.matches("CallOptions deadline exceeded after .*s. \\[.*\\]", desc) // If server expires first, it'd reset the stream and client would generate a different // message || desc.startsWith("ClientCall was cancelled at or after deadline."));