core: Delay client listener exception notification until normal close

This should avoid messages being leaked when a Listener throws an exception and
the executor is shut down immediately after the call completes. This is related
to #7105 but a different scenario and we aren't aware of any user having
observed the previous behavior.

Note also this does _not_ fix the similar case of reordering caused by
delayedCancelOnDeadlineExceeded().
This commit is contained in:
Eric Anderson 2020-07-07 11:16:43 -07:00 committed by Eric Anderson
parent cd70dcbfae
commit 825c34f606
2 changed files with 87 additions and 33 deletions

View File

@ -584,12 +584,24 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
private class ClientStreamListenerImpl implements ClientStreamListener { private class ClientStreamListenerImpl implements ClientStreamListener {
private final Listener<RespT> observer; private final Listener<RespT> observer;
private boolean closed; private Status exceptionStatus;
public ClientStreamListenerImpl(Listener<RespT> observer) { public ClientStreamListenerImpl(Listener<RespT> observer) {
this.observer = checkNotNull(observer, "observer"); this.observer = checkNotNull(observer, "observer");
} }
/**
* Cancels call and schedules onClose() notification. May only be called from the application
* thread.
*/
private void exceptionThrown(Status status) {
// Since each RPC can have its own executor, we can only call onClose() when we are sure there
// will be no further callbacks. We set the status here and overwrite the onClose() details
// when it arrives.
exceptionStatus = status;
stream.cancel(status);
}
@Override @Override
public void headersRead(final Metadata headers) { public void headersRead(final Metadata headers) {
PerfMark.startTask("ClientStreamListener.headersRead", tag); PerfMark.startTask("ClientStreamListener.headersRead", tag);
@ -612,16 +624,14 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
} }
private void runInternal() { private void runInternal() {
if (closed) { if (exceptionStatus != null) {
return; return;
} }
try { try {
observer.onHeaders(headers); observer.onHeaders(headers);
} catch (Throwable t) { } catch (Throwable t) {
Status status = exceptionThrown(
Status.CANCELLED.withCause(t).withDescription("Failed to read headers"); Status.CANCELLED.withCause(t).withDescription("Failed to read headers"));
stream.cancel(status);
close(status, new Metadata());
} }
} }
} }
@ -655,7 +665,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
} }
private void runInternal() { private void runInternal() {
if (closed) { if (exceptionStatus != null) {
GrpcUtil.closeQuietly(producer); GrpcUtil.closeQuietly(producer);
return; return;
} }
@ -672,10 +682,8 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
} }
} catch (Throwable t) { } catch (Throwable t) {
GrpcUtil.closeQuietly(producer); GrpcUtil.closeQuietly(producer);
Status status = exceptionThrown(
Status.CANCELLED.withCause(t).withDescription("Failed to read message."); Status.CANCELLED.withCause(t).withDescription("Failed to read message."));
stream.cancel(status);
close(status, new Metadata());
} }
} }
} }
@ -687,20 +695,6 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
} }
} }
/**
* Must be called from application thread.
*/
private void close(Status status, Metadata trailers) {
closed = true;
cancelListenersShouldBeRemoved = true;
try {
closeObserver(observer, status, trailers);
} finally {
removeContextListenerAndCancelDeadlineFuture();
channelCallsTracer.reportCallEnded(status.isOk());
}
}
@Override @Override
public void closed(Status status, Metadata trailers) { public void closed(Status status, Metadata trailers) {
closed(status, RpcProgress.PROCESSED, trailers); closed(status, RpcProgress.PROCESSED, trailers);
@ -752,11 +746,25 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
} }
private void runInternal() { private void runInternal() {
if (closed) { Status status = savedStatus;
// We intentionally don't keep the status or metadata from the server. Metadata trailers = savedTrailers;
return; if (exceptionStatus != null) {
// Ideally exceptionStatus == savedStatus, as exceptionStatus was passed to cancel().
// However the cancel is racy and this closed() may have already been queued when the
// cancellation occurred. Since other calls like onMessage() will throw away data if
// exceptionStatus != null, it is semantically essential that we _not_ use a status
// provided by the server.
status = exceptionStatus;
// Replace trailers to prevent mixing sources of status and trailers.
trailers = new Metadata();
}
cancelListenersShouldBeRemoved = true;
try {
closeObserver(observer, status, trailers);
} finally {
removeContextListenerAndCancelDeadlineFuture();
channelCallsTracer.reportCallEnded(status.isOk());
} }
close(savedStatus, savedTrailers);
} }
} }
@ -789,13 +797,14 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
} }
private void runInternal() { private void runInternal() {
if (exceptionStatus != null) {
return;
}
try { try {
observer.onReady(); observer.onReady();
} catch (Throwable t) { } catch (Throwable t) {
Status status = exceptionThrown(
Status.CANCELLED.withCause(t).withDescription("Failed to call onReady."); Status.CANCELLED.withCause(t).withDescription("Failed to call onReady."));
stream.cancel(status);
close(status, new Metadata());
} }
} }
} }

View File

@ -256,6 +256,51 @@ public class ClientCallImplTest {
verify(stream).cancel(same(callListenerStatus)); verify(stream).cancel(same(callListenerStatus));
} }
@Test
public void exceptionInOnHeadersHasOnCloseQueuedLast() {
class PointOfNoReturnExecutor implements Executor {
boolean rejectNewRunnables;
@Override public void execute(Runnable command) {
assertThat(rejectNewRunnables).isFalse();
command.run();
}
}
final PointOfNoReturnExecutor executor = new PointOfNoReturnExecutor();
ClientCallImpl<Void, Void> call = new ClientCallImpl<>(
method,
executor,
baseCallOptions,
provider,
deadlineCancellationExecutor,
channelCallTracer,
/* retryEnabled= */ false);
callListener = new NoopClientCall.NoopClientCallListener<Void>() {
private final RuntimeException failure = new RuntimeException("bad");
@Override public void onHeaders(Metadata metadata) {
throw failure;
}
@Override public void onClose(Status status, Metadata metadata) {
verify(stream).cancel(same(status));
assertThat(status.getCode()).isEqualTo(Status.Code.CANCELLED);
assertThat(status.getCause()).isSameInstanceAs(failure);
// At the point onClose() is called the user may shut down the executor, so no further
// Runnables may be scheduled. The only thread-safe way of guaranteeing that is for
// onClose() to be queued last.
executor.rejectNewRunnables = true;
}
};
call.start(callListener, new Metadata());
verify(stream).start(listenerArgumentCaptor.capture());
final ClientStreamListener streamListener = listenerArgumentCaptor.getValue();
streamListener.headersRead(new Metadata());
streamListener.closed(Status.OK, new Metadata());
}
@Test @Test
public void exceptionInOnReadyTakesPrecedenceOverServer() { public void exceptionInOnReadyTakesPrecedenceOverServer() {
DelayedExecutor executor = new DelayedExecutor(); DelayedExecutor executor = new DelayedExecutor();