diff --git a/api/src/main/java/io/grpc/InternalStatus.java b/api/src/main/java/io/grpc/InternalStatus.java index 9f6854a2de..b6549bb435 100644 --- a/api/src/main/java/io/grpc/InternalStatus.java +++ b/api/src/main/java/io/grpc/InternalStatus.java @@ -16,6 +16,8 @@ package io.grpc; +import javax.annotation.Nullable; + /** * Accesses internal data. Do not use this. */ @@ -34,4 +36,14 @@ public final class InternalStatus { */ @Internal public static final Metadata.Key CODE_KEY = Status.CODE_KEY; + + /** + * Create a new {@link StatusRuntimeException} with the internal option of skipping the filling + * of the stack trace. + */ + @Internal + public static final StatusRuntimeException asRuntimeException(Status status, + @Nullable Metadata trailers, boolean fillInStackTrace) { + return new StatusRuntimeException(status, trailers, fillInStackTrace); + } } diff --git a/core/src/main/java/io/grpc/internal/ServerCallImpl.java b/core/src/main/java/io/grpc/internal/ServerCallImpl.java index 0b68071529..47ffdf9caa 100644 --- a/core/src/main/java/io/grpc/internal/ServerCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerCallImpl.java @@ -35,6 +35,7 @@ import io.grpc.CompressorRegistry; import io.grpc.Context; import io.grpc.DecompressorRegistry; import io.grpc.InternalDecompressorRegistry; +import io.grpc.InternalStatus; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.SecurityLevel; @@ -368,19 +369,22 @@ final class ServerCallImpl extends ServerCall { } private void closedInternal(Status status) { + Throwable cancelCause = null; try { if (status.isOk()) { listener.onComplete(); } else { call.cancelled = true; listener.onCancel(); + // The status will not have a cause in all failure scenarios but we want to make sure + // we always cancel the context with one to keep the context cancelled state consistent. + cancelCause = InternalStatus.asRuntimeException( + Status.CANCELLED.withDescription("RPC cancelled"), null, false); } } finally { // Cancel context after delivering RPC closure notification to allow the application to // clean up and update any state based on whether onComplete or onCancel was called. - // Note that in failure situations JumpToApplicationThreadServerStreamListener has already - // closed the context. In these situations this cancel() call will be a no-op. - context.cancel(null); + context.cancel(cancelCause); } } diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index 6bfe2d38ab..bbd52c14bb 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -45,6 +45,7 @@ import io.grpc.InternalChannelz.SocketStats; import io.grpc.InternalInstrumented; import io.grpc.InternalLogId; import io.grpc.InternalServerInterceptors; +import io.grpc.InternalStatus; import io.grpc.Metadata; import io.grpc.ServerCall; import io.grpc.ServerCallExecutorSupplier; @@ -894,9 +895,18 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume // For cancellations, promptly inform any users of the context that their work should be // aborted. Otherwise, we can wait until pending work is done. if (!status.isOk()) { + // Since status was not OK we know that the call did not complete and got cancelled. To + // reflect this on the context we need to close it with a cause exception. Since not every + // failed status has an exception we will create one here if needed. + Throwable cancelCause = status.getCause(); + if (cancelCause == null) { + cancelCause = InternalStatus.asRuntimeException( + Status.CANCELLED.withDescription("RPC cancelled"), null, false); + } + // The callExecutor might be busy doing user work. To avoid waiting, use an executor that // is not serializing. - cancelExecutor.execute(new ContextCloser(context, status.getCause())); + cancelExecutor.execute(new ContextCloser(context, cancelCause)); } final Link link = PerfMark.linkOut(); diff --git a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java index e8087ed2aa..4818c6c201 100644 --- a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java @@ -20,6 +20,7 @@ import static com.google.common.base.Charsets.UTF_8; import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -424,7 +425,7 @@ public class ServerCallImplTest { verify(callListener).onCancel(); assertTrue(context.isCancelled()); - assertNull(context.cancellationCause()); + assertNotNull(context.cancellationCause()); } @Test diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index 0f5c510f97..d3c07787b6 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -130,7 +130,7 @@ public class ServerImplTest { private static final Context.Key SERVER_TRACER_ADDED_KEY = Context.key("tracer-added"); private static final Context.CancellableContext SERVER_CONTEXT = Context.ROOT.withValue(SERVER_ONLY, "yes").withCancellation(); - private static final FakeClock.TaskFilter CONTEXT_CLOSER_TASK_FITLER = + private static final FakeClock.TaskFilter CONTEXT_CLOSER_TASK_FILTER = new FakeClock.TaskFilter() { @Override public boolean shouldAccept(Runnable runnable) { @@ -1085,7 +1085,7 @@ public class ServerImplTest { assertTrue(onHalfCloseCalled.get()); streamListener.closed(Status.CANCELLED); - assertEquals(1, executor.numPendingTasks(CONTEXT_CLOSER_TASK_FITLER)); + assertEquals(1, executor.numPendingTasks(CONTEXT_CLOSER_TASK_FILTER)); assertEquals(2, executor.runDueTasks()); assertTrue(onCancelCalled.get()); @@ -1179,10 +1179,11 @@ public class ServerImplTest { assertFalse(callReference.get().isCancelled()); assertFalse(context.get().isCancelled()); streamListener.closed(Status.CANCELLED); - assertEquals(1, executor.numPendingTasks(CONTEXT_CLOSER_TASK_FITLER)); + assertEquals(1, executor.numPendingTasks(CONTEXT_CLOSER_TASK_FILTER)); assertEquals(2, executor.runDueTasks()); assertTrue(callReference.get().isCancelled()); assertTrue(context.get().isCancelled()); + assertThat(context.get().cancellationCause()).isNotNull(); assertTrue(contextCancelled.get()); } @@ -1208,6 +1209,7 @@ public class ServerImplTest { assertEquals(1, executor.runDueTasks()); assertFalse(callReference.get().isCancelled()); assertTrue(context.get().isCancelled()); + assertThat(context.get().cancellationCause()).isNull(); assertTrue(contextCancelled.get()); } @@ -1228,6 +1230,7 @@ public class ServerImplTest { assertTrue(callReference.get().isCancelled()); assertTrue(context.get().isCancelled()); + assertThat(context.get().cancellationCause()).isNotNull(); assertTrue(contextCancelled.get()); }