core: Assure that context cancellationCause is set (#9501)

core: Assure that context cancellationCause is set

Makes sure that whenever a context is in a cancelled state, we also have
a cancellationCause.
This commit is contained in:
Terry Wilson 2022-08-31 10:43:22 -07:00 committed by GitHub
parent 4b4cb0bd3b
commit 1f33fe6383
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 38 additions and 8 deletions

View File

@ -16,6 +16,8 @@
package io.grpc; package io.grpc;
import javax.annotation.Nullable;
/** /**
* Accesses internal data. Do not use this. * Accesses internal data. Do not use this.
*/ */
@ -34,4 +36,14 @@ public final class InternalStatus {
*/ */
@Internal @Internal
public static final Metadata.Key<Status> CODE_KEY = Status.CODE_KEY; public static final Metadata.Key<Status> CODE_KEY = Status.CODE_KEY;
/**
* Create a new {@link StatusRuntimeException} with the internal option of skipping the filling
* of the stack trace.
*/
@Internal
public static final StatusRuntimeException asRuntimeException(Status status,
@Nullable Metadata trailers, boolean fillInStackTrace) {
return new StatusRuntimeException(status, trailers, fillInStackTrace);
}
} }

View File

@ -35,6 +35,7 @@ import io.grpc.CompressorRegistry;
import io.grpc.Context; import io.grpc.Context;
import io.grpc.DecompressorRegistry; import io.grpc.DecompressorRegistry;
import io.grpc.InternalDecompressorRegistry; import io.grpc.InternalDecompressorRegistry;
import io.grpc.InternalStatus;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
import io.grpc.SecurityLevel; import io.grpc.SecurityLevel;
@ -368,19 +369,22 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
} }
private void closedInternal(Status status) { private void closedInternal(Status status) {
Throwable cancelCause = null;
try { try {
if (status.isOk()) { if (status.isOk()) {
listener.onComplete(); listener.onComplete();
} else { } else {
call.cancelled = true; call.cancelled = true;
listener.onCancel(); listener.onCancel();
// The status will not have a cause in all failure scenarios but we want to make sure
// we always cancel the context with one to keep the context cancelled state consistent.
cancelCause = InternalStatus.asRuntimeException(
Status.CANCELLED.withDescription("RPC cancelled"), null, false);
} }
} finally { } finally {
// Cancel context after delivering RPC closure notification to allow the application to // Cancel context after delivering RPC closure notification to allow the application to
// clean up and update any state based on whether onComplete or onCancel was called. // clean up and update any state based on whether onComplete or onCancel was called.
// Note that in failure situations JumpToApplicationThreadServerStreamListener has already context.cancel(cancelCause);
// closed the context. In these situations this cancel() call will be a no-op.
context.cancel(null);
} }
} }

View File

@ -45,6 +45,7 @@ import io.grpc.InternalChannelz.SocketStats;
import io.grpc.InternalInstrumented; import io.grpc.InternalInstrumented;
import io.grpc.InternalLogId; import io.grpc.InternalLogId;
import io.grpc.InternalServerInterceptors; import io.grpc.InternalServerInterceptors;
import io.grpc.InternalStatus;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.ServerCall; import io.grpc.ServerCall;
import io.grpc.ServerCallExecutorSupplier; import io.grpc.ServerCallExecutorSupplier;
@ -894,9 +895,18 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
// For cancellations, promptly inform any users of the context that their work should be // For cancellations, promptly inform any users of the context that their work should be
// aborted. Otherwise, we can wait until pending work is done. // aborted. Otherwise, we can wait until pending work is done.
if (!status.isOk()) { if (!status.isOk()) {
// Since status was not OK we know that the call did not complete and got cancelled. To
// reflect this on the context we need to close it with a cause exception. Since not every
// failed status has an exception we will create one here if needed.
Throwable cancelCause = status.getCause();
if (cancelCause == null) {
cancelCause = InternalStatus.asRuntimeException(
Status.CANCELLED.withDescription("RPC cancelled"), null, false);
}
// The callExecutor might be busy doing user work. To avoid waiting, use an executor that // The callExecutor might be busy doing user work. To avoid waiting, use an executor that
// is not serializing. // is not serializing.
cancelExecutor.execute(new ContextCloser(context, status.getCause())); cancelExecutor.execute(new ContextCloser(context, cancelCause));
} }
final Link link = PerfMark.linkOut(); final Link link = PerfMark.linkOut();

View File

@ -20,6 +20,7 @@ import static com.google.common.base.Charsets.UTF_8;
import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY; import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -424,7 +425,7 @@ public class ServerCallImplTest {
verify(callListener).onCancel(); verify(callListener).onCancel();
assertTrue(context.isCancelled()); assertTrue(context.isCancelled());
assertNull(context.cancellationCause()); assertNotNull(context.cancellationCause());
} }
@Test @Test

View File

@ -130,7 +130,7 @@ public class ServerImplTest {
private static final Context.Key<String> SERVER_TRACER_ADDED_KEY = Context.key("tracer-added"); private static final Context.Key<String> SERVER_TRACER_ADDED_KEY = Context.key("tracer-added");
private static final Context.CancellableContext SERVER_CONTEXT = private static final Context.CancellableContext SERVER_CONTEXT =
Context.ROOT.withValue(SERVER_ONLY, "yes").withCancellation(); Context.ROOT.withValue(SERVER_ONLY, "yes").withCancellation();
private static final FakeClock.TaskFilter CONTEXT_CLOSER_TASK_FITLER = private static final FakeClock.TaskFilter CONTEXT_CLOSER_TASK_FILTER =
new FakeClock.TaskFilter() { new FakeClock.TaskFilter() {
@Override @Override
public boolean shouldAccept(Runnable runnable) { public boolean shouldAccept(Runnable runnable) {
@ -1085,7 +1085,7 @@ public class ServerImplTest {
assertTrue(onHalfCloseCalled.get()); assertTrue(onHalfCloseCalled.get());
streamListener.closed(Status.CANCELLED); streamListener.closed(Status.CANCELLED);
assertEquals(1, executor.numPendingTasks(CONTEXT_CLOSER_TASK_FITLER)); assertEquals(1, executor.numPendingTasks(CONTEXT_CLOSER_TASK_FILTER));
assertEquals(2, executor.runDueTasks()); assertEquals(2, executor.runDueTasks());
assertTrue(onCancelCalled.get()); assertTrue(onCancelCalled.get());
@ -1179,10 +1179,11 @@ public class ServerImplTest {
assertFalse(callReference.get().isCancelled()); assertFalse(callReference.get().isCancelled());
assertFalse(context.get().isCancelled()); assertFalse(context.get().isCancelled());
streamListener.closed(Status.CANCELLED); streamListener.closed(Status.CANCELLED);
assertEquals(1, executor.numPendingTasks(CONTEXT_CLOSER_TASK_FITLER)); assertEquals(1, executor.numPendingTasks(CONTEXT_CLOSER_TASK_FILTER));
assertEquals(2, executor.runDueTasks()); assertEquals(2, executor.runDueTasks());
assertTrue(callReference.get().isCancelled()); assertTrue(callReference.get().isCancelled());
assertTrue(context.get().isCancelled()); assertTrue(context.get().isCancelled());
assertThat(context.get().cancellationCause()).isNotNull();
assertTrue(contextCancelled.get()); assertTrue(contextCancelled.get());
} }
@ -1208,6 +1209,7 @@ public class ServerImplTest {
assertEquals(1, executor.runDueTasks()); assertEquals(1, executor.runDueTasks());
assertFalse(callReference.get().isCancelled()); assertFalse(callReference.get().isCancelled());
assertTrue(context.get().isCancelled()); assertTrue(context.get().isCancelled());
assertThat(context.get().cancellationCause()).isNull();
assertTrue(contextCancelled.get()); assertTrue(contextCancelled.get());
} }
@ -1228,6 +1230,7 @@ public class ServerImplTest {
assertTrue(callReference.get().isCancelled()); assertTrue(callReference.get().isCancelled());
assertTrue(context.get().isCancelled()); assertTrue(context.get().isCancelled());
assertThat(context.get().cancellationCause()).isNotNull();
assertTrue(contextCancelled.get()); assertTrue(contextCancelled.get());
} }