stub: Only throw on cancellation for streaming responses

Unary are far more common than streaming, and we're throwing for unary even
though it doesn't help the service. Let's stop doing that. We also stop
throwing in onComplete() for all cases, because it doesn't help any service;
it doesn't stop the service's processing and isn't even all that informative
since the cancellation can happen even after onComplete() is called.
This commit is contained in:
Eric Anderson 2020-10-01 15:10:30 -05:00 committed by GitHub
parent f6c2d221e2
commit 0cd56c29d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 39 additions and 29 deletions

View File

@ -48,7 +48,7 @@ public final class ServerCalls {
*/ */
public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncUnaryCall( public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncUnaryCall(
UnaryMethod<ReqT, RespT> method) { UnaryMethod<ReqT, RespT> method) {
return new UnaryServerCallHandler<>(method); return new UnaryServerCallHandler<>(method, false);
} }
/** /**
@ -58,7 +58,7 @@ public final class ServerCalls {
*/ */
public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncServerStreamingCall( public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncServerStreamingCall(
ServerStreamingMethod<ReqT, RespT> method) { ServerStreamingMethod<ReqT, RespT> method) {
return new UnaryServerCallHandler<>(method); return new UnaryServerCallHandler<>(method, true);
} }
/** /**
@ -68,7 +68,7 @@ public final class ServerCalls {
*/ */
public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncClientStreamingCall( public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncClientStreamingCall(
ClientStreamingMethod<ReqT, RespT> method) { ClientStreamingMethod<ReqT, RespT> method) {
return new StreamingServerCallHandler<>(method); return new StreamingServerCallHandler<>(method, false);
} }
/** /**
@ -78,7 +78,7 @@ public final class ServerCalls {
*/ */
public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncBidiStreamingCall( public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncBidiStreamingCall(
BidiStreamingMethod<ReqT, RespT> method) { BidiStreamingMethod<ReqT, RespT> method) {
return new StreamingServerCallHandler<>(method); return new StreamingServerCallHandler<>(method, true);
} }
/** /**
@ -113,10 +113,12 @@ public final class ServerCalls {
implements ServerCallHandler<ReqT, RespT> { implements ServerCallHandler<ReqT, RespT> {
private final UnaryRequestMethod<ReqT, RespT> method; private final UnaryRequestMethod<ReqT, RespT> method;
private final boolean serverStreaming;
// Non private to avoid synthetic class // Non private to avoid synthetic class
UnaryServerCallHandler(UnaryRequestMethod<ReqT, RespT> method) { UnaryServerCallHandler(UnaryRequestMethod<ReqT, RespT> method, boolean serverStreaming) {
this.method = method; this.method = method;
this.serverStreaming = serverStreaming;
} }
@Override @Override
@ -125,7 +127,7 @@ public final class ServerCalls {
call.getMethodDescriptor().getType().clientSendsOneMessage(), call.getMethodDescriptor().getType().clientSendsOneMessage(),
"asyncUnaryRequestCall is only for clientSendsOneMessage methods"); "asyncUnaryRequestCall is only for clientSendsOneMessage methods");
ServerCallStreamObserverImpl<ReqT, RespT> responseObserver = ServerCallStreamObserverImpl<ReqT, RespT> responseObserver =
new ServerCallStreamObserverImpl<>(call); new ServerCallStreamObserverImpl<>(call, serverStreaming);
// We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client // We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client
// sends more than 1 requests, ServerCall will catch it. Note that disabling auto // sends more than 1 requests, ServerCall will catch it. Note that disabling auto
// inbound flow control has no effect on unary calls. // inbound flow control has no effect on unary calls.
@ -189,9 +191,11 @@ public final class ServerCalls {
@Override @Override
public void onCancel() { public void onCancel() {
responseObserver.cancelled = true;
if (responseObserver.onCancelHandler != null) { if (responseObserver.onCancelHandler != null) {
responseObserver.onCancelHandler.run(); responseObserver.onCancelHandler.run();
} else {
// Only trigger exceptions if unable to provide notification via a callback
responseObserver.cancelled = true;
} }
} }
@ -209,16 +213,18 @@ public final class ServerCalls {
implements ServerCallHandler<ReqT, RespT> { implements ServerCallHandler<ReqT, RespT> {
private final StreamingRequestMethod<ReqT, RespT> method; private final StreamingRequestMethod<ReqT, RespT> method;
private final boolean bidi;
// Non private to avoid synthetic class // Non private to avoid synthetic class
StreamingServerCallHandler(StreamingRequestMethod<ReqT, RespT> method) { StreamingServerCallHandler(StreamingRequestMethod<ReqT, RespT> method, boolean bidi) {
this.method = method; this.method = method;
this.bidi = bidi;
} }
@Override @Override
public ServerCall.Listener<ReqT> startCall(ServerCall<ReqT, RespT> call, Metadata headers) { public ServerCall.Listener<ReqT> startCall(ServerCall<ReqT, RespT> call, Metadata headers) {
ServerCallStreamObserverImpl<ReqT, RespT> responseObserver = ServerCallStreamObserverImpl<ReqT, RespT> responseObserver =
new ServerCallStreamObserverImpl<>(call); new ServerCallStreamObserverImpl<>(call, bidi);
StreamObserver<ReqT> requestObserver = method.invoke(responseObserver); StreamObserver<ReqT> requestObserver = method.invoke(responseObserver);
responseObserver.freeze(); responseObserver.freeze();
if (responseObserver.autoRequestEnabled) { if (responseObserver.autoRequestEnabled) {
@ -262,14 +268,19 @@ public final class ServerCalls {
@Override @Override
public void onCancel() { public void onCancel() {
responseObserver.cancelled = true;
if (responseObserver.onCancelHandler != null) { if (responseObserver.onCancelHandler != null) {
responseObserver.onCancelHandler.run(); responseObserver.onCancelHandler.run();
} else {
// Only trigger exceptions if unable to provide notification via a callback. Even though
// onError would provide notification to the server, we still throw an error since there
// isn't a guaranteed callback available. If the cancellation happened in a different
// order the service could be surprised to see the exception.
responseObserver.cancelled = true;
} }
if (!halfClosed) { if (!halfClosed) {
requestObserver.onError( requestObserver.onError(
Status.CANCELLED Status.CANCELLED
.withDescription("cancelled before receiving half close") .withDescription("client cancelled")
.asRuntimeException()); .asRuntimeException());
} }
} }
@ -300,6 +311,7 @@ public final class ServerCalls {
private static final class ServerCallStreamObserverImpl<ReqT, RespT> private static final class ServerCallStreamObserverImpl<ReqT, RespT>
extends ServerCallStreamObserver<RespT> { extends ServerCallStreamObserver<RespT> {
final ServerCall<ReqT, RespT> call; final ServerCall<ReqT, RespT> call;
private final boolean serverStreamingOrBidi;
volatile boolean cancelled; volatile boolean cancelled;
private boolean frozen; private boolean frozen;
private boolean autoRequestEnabled = true; private boolean autoRequestEnabled = true;
@ -310,8 +322,9 @@ public final class ServerCalls {
private boolean completed = false; private boolean completed = false;
// Non private to avoid synthetic class // Non private to avoid synthetic class
ServerCallStreamObserverImpl(ServerCall<ReqT, RespT> call) { ServerCallStreamObserverImpl(ServerCall<ReqT, RespT> call, boolean serverStreamingOrBidi) {
this.call = call; this.call = call;
this.serverStreamingOrBidi = serverStreamingOrBidi;
} }
private void freeze() { private void freeze() {
@ -331,10 +344,17 @@ public final class ServerCalls {
@Override @Override
public void onNext(RespT response) { public void onNext(RespT response) {
if (cancelled) { if (cancelled) {
if (onCancelHandler == null) { if (serverStreamingOrBidi) {
throw Status.CANCELLED.withDescription("call already cancelled").asRuntimeException(); throw Status.CANCELLED
.withDescription("call already cancelled. "
+ "Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception")
.asRuntimeException();
} else {
// We choose not to throw for unary responses. The exception is intended to stop servers
// from continuing processing, but for unary responses there is no further processing
// so throwing an exception would not provide a benefit and would increase application
// complexity.
} }
return;
} }
checkState(!aborted, "Stream was terminated by error, no further calls are allowed"); checkState(!aborted, "Stream was terminated by error, no further calls are allowed");
checkState(!completed, "Stream is already completed, no further calls are allowed"); checkState(!completed, "Stream is already completed, no further calls are allowed");
@ -357,15 +377,9 @@ public final class ServerCalls {
@Override @Override
public void onCompleted() { public void onCompleted() {
if (cancelled) {
if (onCancelHandler == null) {
throw Status.CANCELLED.withDescription("call already cancelled").asRuntimeException();
}
} else {
call.close(Status.OK, new Metadata()); call.close(Status.OK, new Metadata());
completed = true; completed = true;
} }
}
@Override @Override
public boolean isReady() { public boolean isReady() {

View File

@ -195,12 +195,8 @@ public class ServerCallsTest {
} catch (StatusRuntimeException expected) { } catch (StatusRuntimeException expected) {
// Expected // Expected
} }
try { // No exception
callObserver.get().onCompleted(); callObserver.get().onCompleted();
fail("Expected cancellation exception when onCallHandler not set");
} catch (StatusRuntimeException expected) {
// Expected
}
} }
@Test @Test