From 0cd56c29d61f1f09bbed7e15cd8f4b6c25cdda85 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Thu, 1 Oct 2020 15:10:30 -0500 Subject: [PATCH] stub: Only throw on cancellation for streaming responses Unary are far more common than streaming, and we're throwing for unary even though it doesn't help the service. Let's stop doing that. We also stop throwing in onComplete() for all cases, because it doesn't help any service; it doesn't stop the service's processing and isn't even all that informative since the cancellation can happen even after onComplete() is called. --- .../main/java/io/grpc/stub/ServerCalls.java | 60 ++++++++++++------- .../java/io/grpc/stub/ServerCallsTest.java | 8 +-- 2 files changed, 39 insertions(+), 29 deletions(-) diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java index bb9720984d..ba08139b71 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCalls.java +++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java @@ -48,7 +48,7 @@ public final class ServerCalls { */ public static ServerCallHandler asyncUnaryCall( UnaryMethod method) { - return new UnaryServerCallHandler<>(method); + return new UnaryServerCallHandler<>(method, false); } /** @@ -58,7 +58,7 @@ public final class ServerCalls { */ public static ServerCallHandler asyncServerStreamingCall( ServerStreamingMethod method) { - return new UnaryServerCallHandler<>(method); + return new UnaryServerCallHandler<>(method, true); } /** @@ -68,7 +68,7 @@ public final class ServerCalls { */ public static ServerCallHandler asyncClientStreamingCall( ClientStreamingMethod method) { - return new StreamingServerCallHandler<>(method); + return new StreamingServerCallHandler<>(method, false); } /** @@ -78,7 +78,7 @@ public final class ServerCalls { */ public static ServerCallHandler asyncBidiStreamingCall( BidiStreamingMethod method) { - return new StreamingServerCallHandler<>(method); + return new StreamingServerCallHandler<>(method, true); } /** @@ -113,10 +113,12 @@ public final class ServerCalls { implements ServerCallHandler { private final UnaryRequestMethod method; + private final boolean serverStreaming; // Non private to avoid synthetic class - UnaryServerCallHandler(UnaryRequestMethod method) { + UnaryServerCallHandler(UnaryRequestMethod method, boolean serverStreaming) { this.method = method; + this.serverStreaming = serverStreaming; } @Override @@ -125,7 +127,7 @@ public final class ServerCalls { call.getMethodDescriptor().getType().clientSendsOneMessage(), "asyncUnaryRequestCall is only for clientSendsOneMessage methods"); ServerCallStreamObserverImpl responseObserver = - new ServerCallStreamObserverImpl<>(call); + new ServerCallStreamObserverImpl<>(call, serverStreaming); // We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client // sends more than 1 requests, ServerCall will catch it. Note that disabling auto // inbound flow control has no effect on unary calls. @@ -189,9 +191,11 @@ public final class ServerCalls { @Override public void onCancel() { - responseObserver.cancelled = true; if (responseObserver.onCancelHandler != null) { responseObserver.onCancelHandler.run(); + } else { + // Only trigger exceptions if unable to provide notification via a callback + responseObserver.cancelled = true; } } @@ -209,16 +213,18 @@ public final class ServerCalls { implements ServerCallHandler { private final StreamingRequestMethod method; + private final boolean bidi; // Non private to avoid synthetic class - StreamingServerCallHandler(StreamingRequestMethod method) { + StreamingServerCallHandler(StreamingRequestMethod method, boolean bidi) { this.method = method; + this.bidi = bidi; } @Override public ServerCall.Listener startCall(ServerCall call, Metadata headers) { ServerCallStreamObserverImpl responseObserver = - new ServerCallStreamObserverImpl<>(call); + new ServerCallStreamObserverImpl<>(call, bidi); StreamObserver requestObserver = method.invoke(responseObserver); responseObserver.freeze(); if (responseObserver.autoRequestEnabled) { @@ -262,14 +268,19 @@ public final class ServerCalls { @Override public void onCancel() { - responseObserver.cancelled = true; if (responseObserver.onCancelHandler != null) { responseObserver.onCancelHandler.run(); + } else { + // Only trigger exceptions if unable to provide notification via a callback. Even though + // onError would provide notification to the server, we still throw an error since there + // isn't a guaranteed callback available. If the cancellation happened in a different + // order the service could be surprised to see the exception. + responseObserver.cancelled = true; } if (!halfClosed) { requestObserver.onError( Status.CANCELLED - .withDescription("cancelled before receiving half close") + .withDescription("client cancelled") .asRuntimeException()); } } @@ -300,6 +311,7 @@ public final class ServerCalls { private static final class ServerCallStreamObserverImpl extends ServerCallStreamObserver { final ServerCall call; + private final boolean serverStreamingOrBidi; volatile boolean cancelled; private boolean frozen; private boolean autoRequestEnabled = true; @@ -310,8 +322,9 @@ public final class ServerCalls { private boolean completed = false; // Non private to avoid synthetic class - ServerCallStreamObserverImpl(ServerCall call) { + ServerCallStreamObserverImpl(ServerCall call, boolean serverStreamingOrBidi) { this.call = call; + this.serverStreamingOrBidi = serverStreamingOrBidi; } private void freeze() { @@ -331,10 +344,17 @@ public final class ServerCalls { @Override public void onNext(RespT response) { if (cancelled) { - if (onCancelHandler == null) { - throw Status.CANCELLED.withDescription("call already cancelled").asRuntimeException(); + if (serverStreamingOrBidi) { + throw Status.CANCELLED + .withDescription("call already cancelled. " + + "Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception") + .asRuntimeException(); + } else { + // We choose not to throw for unary responses. The exception is intended to stop servers + // from continuing processing, but for unary responses there is no further processing + // so throwing an exception would not provide a benefit and would increase application + // complexity. } - return; } checkState(!aborted, "Stream was terminated by error, no further calls are allowed"); checkState(!completed, "Stream is already completed, no further calls are allowed"); @@ -357,14 +377,8 @@ public final class ServerCalls { @Override public void onCompleted() { - if (cancelled) { - if (onCancelHandler == null) { - throw Status.CANCELLED.withDescription("call already cancelled").asRuntimeException(); - } - } else { - call.close(Status.OK, new Metadata()); - completed = true; - } + call.close(Status.OK, new Metadata()); + completed = true; } @Override diff --git a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java index 29981932fb..f9ceb82166 100644 --- a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java +++ b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java @@ -195,12 +195,8 @@ public class ServerCallsTest { } catch (StatusRuntimeException expected) { // Expected } - try { - callObserver.get().onCompleted(); - fail("Expected cancellation exception when onCallHandler not set"); - } catch (StatusRuntimeException expected) { - // Expected - } + // No exception + callObserver.get().onCompleted(); } @Test