From fd8fd517d21165e1ad97a063861ec0790ed2c610 Mon Sep 17 00:00:00 2001 From: buchgr Date: Wed, 23 Mar 2016 21:08:44 +0100 Subject: [PATCH] Context deadline propagation should cascade. Fixes #1205 A call's timeout as specified in its metadata should be set depending on the deadline of the call's context. If a call has an explicit deadline set (through CallOptions), then the smaller deadline (from context and call options) should be used to compute the timeout. Also, a new method Contexts.statusFromCancelled(Context) was introduced that attempts to map a canceled context to a gRPC status. --- core/src/main/java/io/grpc/Context.java | 22 ++- core/src/main/java/io/grpc/Contexts.java | 33 ++++ .../java/io/grpc/internal/ClientCallImpl.java | 143 +++++++++++------- .../java/io/grpc/internal/ServerCallImpl.java | 11 +- .../java/io/grpc/internal/ServerImpl.java | 69 ++++----- core/src/test/java/io/grpc/ContextTest.java | 19 +++ core/src/test/java/io/grpc/ContextsTest.java | 138 +++++++++++++++++ .../io/grpc/internal/ClientCallImplTest.java | 116 +++++++++++++- .../grpc/internal/ManagedChannelImplTest.java | 6 +- .../io/grpc/internal/ServerCallImplTest.java | 20 ++- .../integration/AbstractInteropTest.java | 5 +- .../testing/integration/CascadingTest.java | 5 +- 12 files changed, 456 insertions(+), 131 deletions(-) create mode 100644 core/src/test/java/io/grpc/ContextsTest.java diff --git a/core/src/main/java/io/grpc/Context.java b/core/src/main/java/io/grpc/Context.java index 9a67ac2f3f..ad24f269b4 100644 --- a/core/src/main/java/io/grpc/Context.java +++ b/core/src/main/java/io/grpc/Context.java @@ -651,14 +651,20 @@ public class Context { ScheduledExecutorService scheduler) { super(parent, deriveDeadline(parent, deadline), true); if (DEADLINE_KEY.get(this) == deadline) { - // The parent deadline was after the new deadline so we need to install a listener - // on the new earlier deadline to trigger expiration for this context. - pendingDeadline = deadline.runOnExpiration(new Runnable() { - @Override - public void run() { - cancel(new TimeoutException("context timed out")); - } - }, scheduler); + final TimeoutException cause = new TimeoutException("context timed out"); + if (!deadline.isExpired()) { + // The parent deadline was after the new deadline so we need to install a listener + // on the new earlier deadline to trigger expiration for this context. + pendingDeadline = deadline.runOnExpiration(new Runnable() { + @Override + public void run() { + cancel(cause); + } + }, scheduler); + } else { + // Cancel immediately if the deadline is already expired. + cancel(cause); + } } uncancellableSurrogate = new Context(this, EMPTY_ENTRIES); } diff --git a/core/src/main/java/io/grpc/Contexts.java b/core/src/main/java/io/grpc/Contexts.java index 4217c43fa4..3b05470219 100644 --- a/core/src/main/java/io/grpc/Contexts.java +++ b/core/src/main/java/io/grpc/Contexts.java @@ -31,6 +31,10 @@ package io.grpc; +import com.google.common.base.Preconditions; + +import java.util.concurrent.TimeoutException; + /** * Utility methods for working with {@link Context}s in GRPC. */ @@ -130,4 +134,33 @@ public class Contexts { } } } + + /** + * Returns the {@link Status} of a cancelled context or {@code null} if the context + * is not cancelled. + */ + public static Status statusFromCancelled(Context context) { + Preconditions.checkNotNull(context, "context must not be null"); + if (!context.isCancelled()) { + return null; + } + + Throwable cancellationCause = context.cancellationCause(); + if (cancellationCause == null) { + return Status.CANCELLED; + } + if (cancellationCause instanceof TimeoutException) { + return Status.DEADLINE_EXCEEDED + .withDescription(cancellationCause.getMessage()) + .withCause(cancellationCause); + } + Status status = Status.fromThrowable(cancellationCause); + if (Status.Code.UNKNOWN.equals(status.getCode()) + && status.getCause() == cancellationCause) { + // If fromThrowable could not determine a status, then + // just return CANCELLED. + return Status.CANCELLED.withCause(cancellationCause); + } + return status.withCause(cancellationCause); + } } diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index 33dbb5dfab..6b83da9017 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -35,12 +35,14 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static io.grpc.Contexts.statusFromCancelled; +import static io.grpc.Status.DEADLINE_EXCEEDED; import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_JOINER; import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY; import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY; import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY; import static io.grpc.internal.GrpcUtil.USER_AGENT_KEY; -import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.lang.Math.max; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -62,7 +64,9 @@ import io.grpc.Status; import java.io.InputStream; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.annotation.Nullable; @@ -71,20 +75,22 @@ import javax.annotation.Nullable; */ final class ClientCallImpl extends ClientCall implements Context.CancellationListener { + + private static final Logger log = Logger.getLogger(ClientCallImpl.class.getName()); + private final MethodDescriptor method; private final Executor callExecutor; - private final Context context; + private final Context parentContext; + private volatile Context context; private final boolean unaryRequest; private final CallOptions callOptions; private ClientStream stream; - private volatile ScheduledFuture deadlineCancellationFuture; - private volatile boolean deadlineCancellationFutureShouldBeCancelled; + private volatile boolean contextListenerShouldBeRemoved; private boolean cancelCalled; private boolean halfCloseCalled; private final ClientTransportProvider clientTransportProvider; private String userAgent; private ScheduledExecutorService deadlineCancellationExecutor; - private Compressor compressor; private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance(); private CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance(); @@ -99,7 +105,7 @@ final class ClientCallImpl extends ClientCall ? new SerializeReentrantCallsDirectExecutor() : new SerializingExecutor(executor); // Propagate the context from the thread which initiated the call to all callbacks. - this.context = Context.current(); + this.parentContext = Context.current(); this.unaryRequest = method.getType() == MethodType.UNARY || method.getType() == MethodType.SERVER_STREAMING; this.callOptions = callOptions; @@ -109,7 +115,7 @@ final class ClientCallImpl extends ClientCall @Override public void cancelled(Context context) { - stream.cancel(Status.CANCELLED.withCause(context.cancellationCause())); + stream.cancel(statusFromCancelled(context)); } /** @@ -165,6 +171,14 @@ final class ClientCallImpl extends ClientCall checkNotNull(observer, "observer"); checkNotNull(headers, "headers"); + // Create the context + final Deadline effectiveDeadline = min(callOptions.getDeadline(), parentContext.getDeadline()); + if (effectiveDeadline != parentContext.getDeadline()) { + context = parentContext.withDeadline(effectiveDeadline, deadlineCancellationExecutor); + } else { + context = parentContext.withCancellation(); + } + if (context.isCancelled()) { // Context is already cancelled so no need to create a real stream, just notify the observer // of cancellation via callback on the executor @@ -172,12 +186,13 @@ final class ClientCallImpl extends ClientCall callExecutor.execute(new ContextRunnable(context) { @Override public void runInContext() { - observer.onClose(Status.CANCELLED.withCause(context.cancellationCause()), new Metadata()); + observer.onClose(statusFromCancelled(context), new Metadata()); } }); return; } final String compressorName = callOptions.getCompressor(); + Compressor compressor = null; if (compressorName != null) { compressor = compressorRegistry.lookupCompressor(compressorName); if (compressor == null) { @@ -199,11 +214,14 @@ final class ClientCallImpl extends ClientCall prepareHeaders(headers, callOptions, userAgent, decompressorRegistry, compressor); - if (updateTimeoutHeader(callOptions.getDeadline(), headers)) { + final boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired(); + if (!deadlineExceeded) { + updateTimeoutHeaders(effectiveDeadline, callOptions.getDeadline(), + parentContext.getDeadline(), headers); ClientTransport transport = clientTransportProvider.get(callOptions); stream = transport.newStream(method, headers); } else { - stream = new FailingClientStream(Status.DEADLINE_EXCEEDED); + stream = new FailingClientStream(DEADLINE_EXCEEDED); } if (callOptions.getAuthority() != null) { @@ -215,45 +233,66 @@ final class ClientCallImpl extends ClientCall if (compressor != Codec.Identity.NONE) { stream.setMessageCompression(true); } + // Delay any sources of cancellation after start(), because most of the transports are broken if // they receive cancel before start. Issue #1343 has more details - // Start the deadline timer after stream creation because it will close the stream - if (callOptions.getDeadline() != null) { - long timeoutNanos = callOptions.getDeadline().timeRemaining(NANOSECONDS); - deadlineCancellationFuture = startDeadlineTimer(timeoutNanos); - if (deadlineCancellationFutureShouldBeCancelled) { - // Race detected! ClientStreamListener.closed may have been called before - // deadlineCancellationFuture was set, thereby preventing the future from being cancelled. - // Go ahead and cancel again, just to be sure it was cancelled. - deadlineCancellationFuture.cancel(false); - } - } // Propagate later Context cancellation to the remote side. - this.context.addListener(this, directExecutor()); + context.addListener(this, directExecutor()); + if (contextListenerShouldBeRemoved) { + // Race detected! ClientStreamListener.closed may have been called before + // deadlineCancellationFuture was set, thereby preventing the future from being cancelled. + // Go ahead and cancel again, just to be sure it was cancelled. + context.removeListener(this); + } } /** * Based on the deadline, calculate and set the timeout to the given headers. - * - * @return {@code false} if deadline already exceeded */ - static boolean updateTimeoutHeader(@Nullable Deadline deadline, Metadata headers) { - // Fill out timeout on the headers - // TODO(carl-mastrangelo): Find out if this should always remove the timeout, - // even when returning false. + private static void updateTimeoutHeaders(@Nullable Deadline effectiveDeadline, + @Nullable Deadline callDeadline, @Nullable Deadline outerCallDeadline, Metadata headers) { headers.removeAll(TIMEOUT_KEY); - if (deadline != null) { - // Convert the deadline to timeout. Timeout is more favorable than deadline on the wire - // because timeout tolerates the clock difference between machines. - long timeoutNanos = deadline.timeRemaining(NANOSECONDS); - if (timeoutNanos <= 0) { - return false; - } - headers.put(TIMEOUT_KEY, timeoutNanos); + if (effectiveDeadline == null) { + return; } - return true; + + long effectiveTimeout = max(0, effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS)); + headers.put(TIMEOUT_KEY, effectiveTimeout); + + logIfContextNarrowedTimeout(effectiveTimeout, effectiveDeadline, outerCallDeadline, + callDeadline); + } + + private static void logIfContextNarrowedTimeout(long effectiveTimeout, + Deadline effectiveDeadline, @Nullable Deadline outerCallDeadline, + @Nullable Deadline callDeadline) { + if (!log.isLoggable(Level.INFO) || outerCallDeadline != effectiveDeadline) { + return; + } + + StringBuilder builder = new StringBuilder(); + builder.append(String.format("Call timeout set to '%d' ns, due to context deadline.", + effectiveTimeout)); + if (callDeadline == null) { + builder.append(" Explicit call timeout was not set."); + } else { + long callTimeout = callDeadline.timeRemaining(TimeUnit.NANOSECONDS); + builder.append(String.format(" Explicit call timeout was '%d' ns.", callTimeout)); + } + + log.info(builder.toString()); + } + + private static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline deadline1) { + if (deadline0 == null) { + return deadline1; + } + if (deadline1 == null) { + return deadline0; + } + return deadline0.minimum(deadline1); } @Override @@ -276,7 +315,9 @@ final class ClientCallImpl extends ClientCall stream.cancel(Status.CANCELLED); } } finally { - context.removeListener(ClientCallImpl.this); + if (context != null) { + context.removeListener(ClientCallImpl.this); + } } } @@ -321,15 +362,6 @@ final class ClientCallImpl extends ClientCall return stream.isReady(); } - private ScheduledFuture startDeadlineTimer(long timeoutNanos) { - return deadlineCancellationExecutor.schedule(new Runnable() { - @Override - public void run() { - stream.cancel(Status.DEADLINE_EXCEEDED); - } - }, timeoutNanos, NANOSECONDS); - } - private class ClientStreamListenerImpl implements ClientStreamListener { private final Listener observer; private boolean closed; @@ -394,13 +426,13 @@ final class ClientCallImpl extends ClientCall @Override public void closed(Status status, Metadata trailers) { - if (status.getCode() == Status.Code.CANCELLED && callOptions.getDeadline() != null) { + Deadline deadline = context.getDeadline(); + if (status.getCode() == Status.Code.CANCELLED && deadline != null) { // When the server's deadline expires, it can only reset the stream with CANCEL and no // description. Since our timer may be delayed in firing, we double-check the deadline and // turn the failure into the likely more helpful DEADLINE_EXCEEDED status. - long timeoutNanos = callOptions.getDeadline().timeRemaining(NANOSECONDS); - if (timeoutNanos <= 0) { - status = Status.DEADLINE_EXCEEDED; + if (deadline.isExpired()) { + status = DEADLINE_EXCEEDED; // Replace trailers to prevent mixing sources of status and trailers. trailers = new Metadata(); } @@ -412,12 +444,7 @@ final class ClientCallImpl extends ClientCall public final void runInContext() { try { closed = true; - deadlineCancellationFutureShouldBeCancelled = true; - // manually optimize the volatile read - ScheduledFuture future = deadlineCancellationFuture; - if (future != null) { - future.cancel(false); - } + contextListenerShouldBeRemoved = true; observer.onClose(savedStatus, savedTrailers); } finally { context.removeListener(ClientCallImpl.this); diff --git a/core/src/main/java/io/grpc/internal/ServerCallImpl.java b/core/src/main/java/io/grpc/internal/ServerCallImpl.java index ffa858341e..8ab5564b28 100644 --- a/core/src/main/java/io/grpc/internal/ServerCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerCallImpl.java @@ -58,7 +58,6 @@ import java.io.IOException; import java.io.InputStream; import java.util.List; import java.util.Set; -import java.util.concurrent.Future; final class ServerCallImpl extends ServerCall { private final ServerStream stream; @@ -198,9 +197,8 @@ final class ServerCallImpl extends ServerCall { return cancelled; } - ServerStreamListener newServerStreamListener(ServerCall.Listener listener, - Future timeout) { - return new ServerStreamListenerImpl(this, listener, timeout, context); + ServerStreamListener newServerStreamListener(ServerCall.Listener listener) { + return new ServerStreamListenerImpl(this, listener, context); } @Override @@ -216,16 +214,14 @@ final class ServerCallImpl extends ServerCall { static final class ServerStreamListenerImpl implements ServerStreamListener { private final ServerCallImpl call; private final ServerCall.Listener listener; - private final Future timeout; private final Context.CancellableContext context; private boolean messageReceived; public ServerStreamListenerImpl( - ServerCallImpl call, ServerCall.Listener listener, Future timeout, + ServerCallImpl call, ServerCall.Listener listener, Context.CancellableContext context) { this.call = checkNotNull(call, "call"); this.listener = checkNotNull(listener, "listener must not be null"); - this.timeout = checkNotNull(timeout, "timeout"); this.context = checkNotNull(context, "context"); } @@ -265,7 +261,6 @@ final class ServerCallImpl extends ServerCall { @Override public void closed(Status status) { - timeout.cancel(true); try { if (status.isOk()) { listener.onComplete(); diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index c77f39a8d0..171ad1c648 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -32,12 +32,14 @@ package io.grpc.internal; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static io.grpc.Contexts.statusFromCancelled; +import static io.grpc.Status.DEADLINE_EXCEEDED; import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY; import static io.grpc.internal.GrpcUtil.TIMER_SERVICE; +import static java.util.concurrent.TimeUnit.NANOSECONDS; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.MoreExecutors; import io.grpc.CompressorRegistry; import io.grpc.Context; @@ -55,10 +57,8 @@ import java.util.Collection; import java.util.HashSet; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; /** * Default implementation of {@link io.grpc.Server}, for creation by transports. @@ -77,17 +77,6 @@ import java.util.concurrent.TimeoutException; public final class ServerImpl extends io.grpc.Server { private static final ServerStreamListener NOOP_LISTENER = new NoopListener(); - private static final Future DEFAULT_TIMEOUT_FUTURE = Futures.immediateCancelledFuture(); - - private static final TimeoutException TIMEOUT_EXCEPTION = - new TimeoutException("request timed out") { - @Override - public synchronized Throwable fillInStackTrace() { - // Suppress the stack trace as it would be confusing. - return this; - } - }; - /** Executor for application processing. */ private Executor executor; private boolean usingSharedExecutor; @@ -200,7 +189,7 @@ public final class ServerImpl extends io.grpc.Server { long timeoutNanos = unit.toNanos(timeout); long endTimeNanos = System.nanoTime() + timeoutNanos; while (!terminated && (timeoutNanos = endTimeNanos - System.nanoTime()) > 0) { - TimeUnit.NANOSECONDS.timedWait(lock, timeoutNanos); + NANOSECONDS.timedWait(lock, timeoutNanos); } return terminated; } @@ -293,12 +282,11 @@ public final class ServerImpl extends io.grpc.Server { @Override public ServerStreamListener streamCreated(final ServerStream stream, final String methodName, final Metadata headers) { - final Context.CancellableContext context = rootContext.withCancellation(); - final Future timeout = scheduleTimeout(stream, headers, context); + final Context.CancellableContext context = createContext(stream, headers); final Executor wrappedExecutor; // This is a performance optimization that avoids the synchronization and queuing overhead // that comes with SerializingExecutor. - if (executor == MoreExecutors.directExecutor()) { + if (executor == directExecutor()) { wrappedExecutor = new SerializeReentrantCallsDirectExecutor(); } else { wrappedExecutor = new SerializingExecutor(executor); @@ -319,17 +307,17 @@ public final class ServerImpl extends io.grpc.Server { stream.close( Status.UNIMPLEMENTED.withDescription("Method not found: " + methodName), new Metadata()); - timeout.cancel(true); + context.cancel(null); return; } - listener = startCall(stream, methodName, method, timeout, headers, context); + listener = startCall(stream, methodName, method, headers, context); } catch (RuntimeException e) { stream.close(Status.fromThrowable(e), new Metadata()); - timeout.cancel(true); + context.cancel(null); throw e; } catch (Throwable t) { stream.close(Status.fromThrowable(t), new Metadata()); - timeout.cancel(true); + context.cancel(null); throw new RuntimeException(t); } finally { jumpListener.setListener(listener); @@ -339,31 +327,34 @@ public final class ServerImpl extends io.grpc.Server { return jumpListener; } - private Future scheduleTimeout(final ServerStream stream, Metadata headers, - final Context.CancellableContext context) { + private Context.CancellableContext createContext(final ServerStream stream, Metadata headers) { Long timeoutNanos = headers.get(TIMEOUT_KEY); + if (timeoutNanos == null) { - return DEFAULT_TIMEOUT_FUTURE; + return rootContext.withCancellation(); } - return timeoutService.schedule(new Runnable() { - @Override - public void run() { + + Context.CancellableContext context = + rootContext.withDeadlineAfter(timeoutNanos, NANOSECONDS, timeoutService); + context.addListener(new Context.CancellationListener() { + @Override + public void cancelled(Context context) { + Status status = statusFromCancelled(context); + if (DEADLINE_EXCEEDED.getCode().equals(status.getCode())) { // This should rarely get run, since the client will likely cancel the stream before // the timeout is reached. - stream.cancel(Status.DEADLINE_EXCEEDED); - // Cancel the context using a statically created exception as this event may occur - // often enough that stack trace alloc impacts performance. - context.cancel(TIMEOUT_EXCEPTION); + stream.cancel(status); } - }, - timeoutNanos, - TimeUnit.NANOSECONDS); + } + }, directExecutor()); + + return context; } /** Never returns {@code null}. */ private ServerStreamListener startCall(ServerStream stream, String fullMethodName, - ServerMethodDefinition methodDef, Future timeout, - Metadata headers, Context.CancellableContext context) { + ServerMethodDefinition methodDef, Metadata headers, + Context.CancellableContext context) { // TODO(ejona86): should we update fullMethodName to have the canonical path of the method? ServerCallImpl call = new ServerCallImpl( stream, methodDef.getMethodDescriptor(), headers, context, decompressorRegistry, @@ -374,7 +365,7 @@ public final class ServerImpl extends io.grpc.Server { throw new NullPointerException( "startCall() returned a null listener for method " + fullMethodName); } - return call.newServerStreamListener(listener, timeout); + return call.newServerStreamListener(listener); } } diff --git a/core/src/test/java/io/grpc/ContextTest.java b/core/src/test/java/io/grpc/ContextTest.java index 16e91083ff..56c897ad33 100644 --- a/core/src/test/java/io/grpc/ContextTest.java +++ b/core/src/test/java/io/grpc/ContextTest.java @@ -31,12 +31,14 @@ package io.grpc; +import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -734,4 +736,21 @@ public class ContextTest { assertTrue(parentCalled.get()); assertTrue(childAfterParent.get()); } + + @Test + public void expiredDeadlineShouldCancelContextImmediately() { + Context parent = Context.current(); + assertFalse(parent.isCancelled()); + + Context.CancellableContext context = parent.withDeadlineAfter(0, TimeUnit.SECONDS, scheduler); + assertTrue(context.isCancelled()); + assertThat(context.cancellationCause(), instanceOf(TimeoutException.class)); + + assertFalse(parent.isCancelled()); + Deadline deadline = Deadline.after(-10, TimeUnit.SECONDS); + assertTrue(deadline.isExpired()); + context = parent.withDeadline(deadline, scheduler); + assertTrue(context.isCancelled()); + assertThat(context.cancellationCause(), instanceOf(TimeoutException.class)); + } } diff --git a/core/src/test/java/io/grpc/ContextsTest.java b/core/src/test/java/io/grpc/ContextsTest.java new file mode 100644 index 0000000000..1a765700c9 --- /dev/null +++ b/core/src/test/java/io/grpc/ContextsTest.java @@ -0,0 +1,138 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +import static io.grpc.Contexts.statusFromCancelled; +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import io.grpc.internal.FakeClock; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Tests for {@link Contexts}. + */ +@RunWith(JUnit4.class) +public class ContextsTest { + + @Test + public void statusFromCancelled_returnNullIfCtxNotCancelled() { + Context context = Context.current(); + assertFalse(context.isCancelled()); + assertNull(statusFromCancelled(context)); + } + + @Test + public void statusFromCancelled_returnStatusAsSetOnCtx() { + Context.CancellableContext cancellableContext = Context.current().fork(); + cancellableContext.cancel(Status.DEADLINE_EXCEEDED.withDescription("foo bar").asException()); + Status status = statusFromCancelled(cancellableContext); + assertNotNull(status); + assertEquals(Status.Code.DEADLINE_EXCEEDED, status.getCode()); + assertEquals("foo bar", status.getDescription()); + } + + @Test + public void statusFromCancelled_shouldReturnStatusWithCauseAttached() { + Context.CancellableContext cancellableContext = Context.current().fork(); + Throwable t = new Throwable(); + cancellableContext.cancel(t); + Status status = statusFromCancelled(cancellableContext); + assertNotNull(status); + assertEquals(Status.Code.CANCELLED, status.getCode()); + assertSame(t, status.getCause()); + } + + @Test + public void statusFromCancelled_TimeoutExceptionShouldMapToDeadlineExceeded() { + FakeClock fakeClock = new FakeClock(); + Context.CancellableContext cancellableContext = Context.current() + .withDeadlineAfter(100, TimeUnit.MILLISECONDS, fakeClock.scheduledExecutorService); + fakeClock.forwardTime(System.nanoTime(), TimeUnit.NANOSECONDS); + fakeClock.forwardMillis(100); + + assertTrue(cancellableContext.isCancelled()); + assertThat(cancellableContext.cancellationCause(), instanceOf(TimeoutException.class)); + + Status status = statusFromCancelled(cancellableContext); + assertNotNull(status); + assertEquals(Status.Code.DEADLINE_EXCEEDED, status.getCode()); + assertEquals("context timed out", status.getDescription()); + } + + @Test + public void statusFromCancelled_returnCancelledIfCauseIsNull() { + Context.CancellableContext cancellableContext = Context.current().fork(); + cancellableContext.cancel(null); + assertTrue(cancellableContext.isCancelled()); + Status status = statusFromCancelled(cancellableContext); + assertNotNull(status); + assertEquals(Status.Code.CANCELLED, status.getCode()); + } + + /** This is a whitebox test, to verify a special case of the implementation. */ + @Test + public void statusFromCancelled_StatusUnknownShouldWork() { + Context.CancellableContext cancellableContext = Context.current().fork(); + Exception e = Status.UNKNOWN.asException(); + cancellableContext.cancel(e); + assertTrue(cancellableContext.isCancelled()); + + Status status = statusFromCancelled(cancellableContext); + assertNotNull(status); + assertEquals(Status.Code.UNKNOWN, status.getCode()); + assertSame(e, status.getCause()); + } + + @Test + public void statusFromCancelled_shouldThrowIfCtxIsNull() { + try { + statusFromCancelled(null); + fail("NPE expected"); + } catch (NullPointerException npe) { + assertEquals("context must not be null", npe.getMessage()); + } + } + +} diff --git a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java index ef7819a638..8b77bb4421 100644 --- a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java @@ -34,6 +34,7 @@ package io.grpc.internal; import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; @@ -126,6 +127,9 @@ public class ClientCallImplTest { @Captor private ArgumentCaptor listenerArgumentCaptor; + @Captor + private ArgumentCaptor statusArgumentCaptor; + @Before public void setUp() { MockitoAnnotations.initMocks(this); @@ -362,7 +366,10 @@ public class ClientCallImplTest { call.start(callListener, new Metadata()); - cancellableContext.cancel(new Throwable()); + Throwable t = new Throwable(); + cancellableContext.cancel(t); + + verify(stream, times(1)).cancel(statusArgumentCaptor.capture()); verify(stream, times(1)).cancel(statusCaptor.capture()); assertEquals(Status.Code.CANCELLED, statusCaptor.getValue().getCode()); @@ -433,6 +440,107 @@ public class ClientCallImplTest { verifyZeroInteractions(provider); } + @Test + public void contextDeadlineShouldBePropagatedInMetadata() { + long deadlineNanos = TimeUnit.SECONDS.toNanos(1); + Context context = Context.current().withDeadlineAfter(deadlineNanos, TimeUnit.NANOSECONDS, + deadlineCancellationExecutor); + context.attach(); + + ClientCallImpl call = new ClientCallImpl( + DESCRIPTOR, + MoreExecutors.directExecutor(), + CallOptions.DEFAULT, + provider, + deadlineCancellationExecutor); + + Metadata headers = new Metadata(); + + call.start(callListener, headers); + + assertTrue(headers.containsKey(GrpcUtil.TIMEOUT_KEY)); + Long timeout = headers.get(GrpcUtil.TIMEOUT_KEY); + assertNotNull(timeout); + + long deltaNanos = TimeUnit.MILLISECONDS.toNanos(400); + assertTimeoutBetween(timeout, deadlineNanos - deltaNanos, deadlineNanos); + } + + @Test + public void contextDeadlineShouldOverrideLargerMetadataTimeout() { + long deadlineNanos = TimeUnit.SECONDS.toNanos(1); + Context context = Context.current().withDeadlineAfter(deadlineNanos, TimeUnit.NANOSECONDS, + deadlineCancellationExecutor); + context.attach(); + + CallOptions callOpts = CallOptions.DEFAULT.withDeadlineAfter(2, TimeUnit.SECONDS); + ClientCallImpl call = new ClientCallImpl( + DESCRIPTOR, + MoreExecutors.directExecutor(), + callOpts, + provider, + deadlineCancellationExecutor); + + Metadata headers = new Metadata(); + + call.start(callListener, headers); + + assertTrue(headers.containsKey(GrpcUtil.TIMEOUT_KEY)); + Long timeout = headers.get(GrpcUtil.TIMEOUT_KEY); + assertNotNull(timeout); + + long deltaNanos = TimeUnit.MILLISECONDS.toNanos(400); + assertTimeoutBetween(timeout, deadlineNanos - deltaNanos, deadlineNanos); + } + + @Test + public void contextDeadlineShouldNotOverrideSmallerMetadataTimeout() { + long deadlineNanos = TimeUnit.SECONDS.toNanos(2); + Context context = Context.current().withDeadlineAfter(deadlineNanos, TimeUnit.NANOSECONDS, + deadlineCancellationExecutor); + context.attach(); + + CallOptions callOpts = CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS); + ClientCallImpl call = new ClientCallImpl( + DESCRIPTOR, + MoreExecutors.directExecutor(), + callOpts, + provider, + deadlineCancellationExecutor); + + Metadata headers = new Metadata(); + + call.start(callListener, headers); + + assertTrue(headers.containsKey(GrpcUtil.TIMEOUT_KEY)); + Long timeout = headers.get(GrpcUtil.TIMEOUT_KEY); + assertNotNull(timeout); + + long callOptsNanos = TimeUnit.SECONDS.toNanos(1); + long deltaNanos = TimeUnit.MILLISECONDS.toNanos(400); + assertTimeoutBetween(timeout, callOptsNanos - deltaNanos, callOptsNanos); + } + + /** + * Without a context or call options deadline, + * a timeout should not be set in metadata. + */ + @Test + public void timeoutShouldNotBeSet() { + ClientCallImpl call = new ClientCallImpl( + DESCRIPTOR, + MoreExecutors.directExecutor(), + CallOptions.DEFAULT, + provider, + deadlineCancellationExecutor); + + Metadata headers = new Metadata(); + + call.start(callListener, headers); + + assertFalse(headers.containsKey(GrpcUtil.TIMEOUT_KEY)); + } + private static class TestMarshaller implements Marshaller { @Override public InputStream stream(T value) { @@ -444,5 +552,11 @@ public class ClientCallImplTest { return null; } } + + private static void assertTimeoutBetween(long timeout, long from, long to) { + assertTrue("timeout: " + timeout + " ns", timeout <= to); + assertTrue("timeout: " + timeout + " ns", timeout >= from); + } + } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 83ed477bc4..d8038b7960 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -165,8 +165,10 @@ public class ManagedChannelImplTest { ClientCall call = channel.newCall(method, CallOptions.DEFAULT.withDeadlineAfter(0, TimeUnit.NANOSECONDS)); call.start(mockCallListener, new Metadata()); - verify(mockCallListener, timeout(1000)).onClose( - same(Status.DEADLINE_EXCEEDED), any(Metadata.class)); + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); + verify(mockCallListener, timeout(1000)).onClose(statusCaptor.capture(), any(Metadata.class)); + Status status = statusCaptor.getValue(); + assertSame(Status.DEADLINE_EXCEEDED.getCode(), status.getCode()); } @Test diff --git a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java index f6e6bb59b5..e7d4cd91d1 100644 --- a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java @@ -189,7 +189,7 @@ public class ServerCallImplTest { @Test public void streamListener_halfClosed() { ServerStreamListenerImpl streamListener = - new ServerCallImpl.ServerStreamListenerImpl(call, callListener, timeout, context); + new ServerCallImpl.ServerStreamListenerImpl(call, callListener, context); streamListener.halfClosed(); @@ -199,7 +199,7 @@ public class ServerCallImplTest { @Test public void streamListener_halfClosed_onlyOnce() { ServerStreamListenerImpl streamListener = - new ServerCallImpl.ServerStreamListenerImpl(call, callListener, timeout, context); + new ServerCallImpl.ServerStreamListenerImpl(call, callListener, context); streamListener.halfClosed(); // canceling the call should short circuit future halfClosed() calls. streamListener.closed(Status.CANCELLED); @@ -212,12 +212,11 @@ public class ServerCallImplTest { @Test public void streamListener_closedOk() { ServerStreamListenerImpl streamListener = - new ServerCallImpl.ServerStreamListenerImpl(call, callListener, timeout, context); + new ServerCallImpl.ServerStreamListenerImpl(call, callListener, context); streamListener.closed(Status.OK); verify(callListener).onComplete(); - assertTrue(timeout.isCancelled()); assertTrue(context.isCancelled()); assertNull(context.cancellationCause()); } @@ -225,12 +224,11 @@ public class ServerCallImplTest { @Test public void streamListener_closedCancelled() { ServerStreamListenerImpl streamListener = - new ServerCallImpl.ServerStreamListenerImpl(call, callListener, timeout, context); + new ServerCallImpl.ServerStreamListenerImpl(call, callListener, context); streamListener.closed(Status.CANCELLED); verify(callListener).onCancel(); - assertTrue(timeout.isCancelled()); assertTrue(context.isCancelled()); assertNull(context.cancellationCause()); } @@ -238,7 +236,7 @@ public class ServerCallImplTest { @Test public void streamListener_onReady() { ServerStreamListenerImpl streamListener = - new ServerCallImpl.ServerStreamListenerImpl(call, callListener, timeout, context); + new ServerCallImpl.ServerStreamListenerImpl(call, callListener, context); streamListener.onReady(); @@ -248,7 +246,7 @@ public class ServerCallImplTest { @Test public void streamListener_onReady_onlyOnce() { ServerStreamListenerImpl streamListener = - new ServerCallImpl.ServerStreamListenerImpl(call, callListener, timeout, context); + new ServerCallImpl.ServerStreamListenerImpl(call, callListener, context); streamListener.onReady(); // canceling the call should short circuit future halfClosed() calls. streamListener.closed(Status.CANCELLED); @@ -261,7 +259,7 @@ public class ServerCallImplTest { @Test public void streamListener_messageRead() { ServerStreamListenerImpl streamListener = - new ServerCallImpl.ServerStreamListenerImpl(call, callListener, timeout, context); + new ServerCallImpl.ServerStreamListenerImpl(call, callListener, context); streamListener.messageRead(method.streamRequest(1234L)); verify(callListener).onMessage(1234L); @@ -270,7 +268,7 @@ public class ServerCallImplTest { @Test public void streamListener_messageRead_unaryFailsOnMultiple() { ServerStreamListenerImpl streamListener = - new ServerCallImpl.ServerStreamListenerImpl(call, callListener, timeout, context); + new ServerCallImpl.ServerStreamListenerImpl(call, callListener, context); streamListener.messageRead(method.streamRequest(1234L)); streamListener.messageRead(method.streamRequest(1234L)); @@ -284,7 +282,7 @@ public class ServerCallImplTest { @Test public void streamListener_messageRead_onlyOnce() { ServerStreamListenerImpl streamListener = - new ServerCallImpl.ServerStreamListenerImpl(call, callListener, timeout, context); + new ServerCallImpl.ServerStreamListenerImpl(call, callListener, context); streamListener.messageRead(method.streamRequest(1234L)); // canceling the call should short circuit future halfClosed() calls. streamListener.closed(Status.CANCELLED); diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index a60b09fddf..6fec2a396f 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -643,7 +643,7 @@ public abstract class AbstractInteropTest { .build()).next(); fail("Expected deadline to be exceeded"); } catch (Throwable t) { - assertEquals(Status.DEADLINE_EXCEEDED, Status.fromThrowable(t)); + assertEquals(Status.DEADLINE_EXCEEDED.getCode(), Status.fromThrowable(t).getCode()); } } @@ -666,7 +666,8 @@ public abstract class AbstractInteropTest { .withDeadlineAfter(30, TimeUnit.MILLISECONDS) .streamingOutputCall(request, recorder); recorder.awaitCompletion(); - assertEquals(Status.DEADLINE_EXCEEDED, Status.fromThrowable(recorder.getError())); + assertEquals(Status.DEADLINE_EXCEEDED.getCode(), + Status.fromThrowable(recorder.getError()).getCode()); } @Test(timeout = 10000) diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/CascadingTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/CascadingTest.java index d245e34a18..b8115beca6 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/CascadingTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/CascadingTest.java @@ -121,7 +121,7 @@ public class CascadingTest { } catch (StatusRuntimeException sre) { // Wait for the workers to finish Status status = Status.fromThrowable(sre); - assertEquals(Status.Code.CANCELLED, status.getCode()); + assertEquals(Status.Code.DEADLINE_EXCEEDED, status.getCode()); // Should have 3 calls before timeout propagates assertEquals(3, nodeCount.get()); @@ -229,7 +229,8 @@ public class CascadingTest { blockingStub.unaryCall((Messages.SimpleRequest) message); } catch (Exception e) { Status status = Status.fromThrowable(e); - if (status.getCode() == Status.Code.CANCELLED) { + if (status.getCode() == Status.Code.CANCELLED + || status.getCode() == Status.Code.DEADLINE_EXCEEDED) { observedCancellations.countDown(); } else if (status.getCode() == Status.Code.ABORTED) { // Propagate aborted back up