core: delay sending cancel request on client-side when deadline expires (#6328)

This commit is contained in:
Ran 2019-12-16 09:58:36 -08:00 committed by GitHub
parent 4357f7f159
commit f6544bf95c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 175 additions and 99 deletions

View File

@ -68,13 +68,19 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
private static final Logger log = Logger.getLogger(ClientCallImpl.class.getName());
private static final byte[] FULL_STREAM_DECOMPRESSION_ENCODINGS
= "gzip".getBytes(Charset.forName("US-ASCII"));
// When a deadline is exceeded, there is a race between the server receiving the cancellation from
// the client and the server cancelling the stream itself. If the client's cancellation is
// received first, then the stream's status will be CANCELLED instead of DEADLINE_EXCEEDED.
// This prevents server monitoring from noticing high rate of DEADLINE_EXCEEDED, a common
// monitoring metric (b/118879795). Mitigate this by delayed sending of the client's cancellation.
@VisibleForTesting
static final long DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS = TimeUnit.SECONDS.toNanos(1);
private final MethodDescriptor<ReqT, RespT> method;
private final Tag tag;
private final Executor callExecutor;
private final CallTracer channelCallsTracer;
private final Context context;
private volatile ScheduledFuture<?> deadlineCancellationFuture;
private final boolean unaryRequest;
private final CallOptions callOptions;
private final boolean retryEnabled;
@ -83,11 +89,14 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
private boolean cancelCalled;
private boolean halfCloseCalled;
private final ClientTransportProvider clientTransportProvider;
private final CancellationListener cancellationListener = new ContextCancellationListener();
private ContextCancellationListener cancellationListener;
private final ScheduledExecutorService deadlineCancellationExecutor;
private boolean fullStreamDecompression;
private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
private CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance();
private volatile ScheduledFuture<?> deadlineCancellationNotifyApplicationFuture;
private volatile ScheduledFuture<?> deadlineCancellationSendToServerFuture;
private boolean observerClosed = false;
ClientCallImpl(
MethodDescriptor<ReqT, RespT> method, Executor executor, CallOptions callOptions,
@ -117,9 +126,20 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
}
private final class ContextCancellationListener implements CancellationListener {
private Listener<RespT> observer;
private ContextCancellationListener(Listener<RespT> observer) {
this.observer = observer;
}
@Override
public void cancelled(Context context) {
stream.cancel(statusFromCancelled(context));
if (context.getDeadline() == null || !context.getDeadline().isExpired()) {
stream.cancel(statusFromCancelled(context));
} else {
Status status = statusFromCancelled(context);
delayedCancelOnDeadlineExceeded(status, observer);
}
}
}
@ -203,18 +223,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
// Context is already cancelled so no need to create a real stream, just notify the observer
// of cancellation via callback on the executor
stream = NoopClientStream.INSTANCE;
class ClosedByContext extends ContextRunnable {
ClosedByContext() {
super(context);
}
@Override
public void runInContext() {
closeObserver(observer, statusFromCancelled(context), new Metadata());
}
}
callExecutor.execute(new ClosedByContext());
executeCloseObserverInContext(observer, statusFromCancelled(context));
return;
}
final String compressorName = callOptions.getCompressor();
@ -223,22 +232,9 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
compressor = compressorRegistry.lookupCompressor(compressorName);
if (compressor == null) {
stream = NoopClientStream.INSTANCE;
class ClosedByNotFoundCompressor extends ContextRunnable {
ClosedByNotFoundCompressor() {
super(context);
}
@Override
public void runInContext() {
closeObserver(
observer,
Status.INTERNAL.withDescription(
String.format("Unable to find compressor by name %s", compressorName)),
new Metadata());
}
}
callExecutor.execute(new ClosedByNotFoundCompressor());
Status status = Status.INTERNAL.withDescription(
String.format("Unable to find compressor by name %s", compressorName));
executeCloseObserverInContext(observer, status);
return;
}
} else {
@ -287,6 +283,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
}
stream.setDecompressorRegistry(decompressorRegistry);
channelCallsTracer.reportCallStarted();
cancellationListener = new ContextCancellationListener(observer);
stream.start(new ClientStreamListenerImpl(observer));
// Delay any sources of cancellation after start(), because most of the transports are broken if
@ -298,8 +295,11 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
// If the context has the effective deadline, we don't need to schedule an extra task.
&& !effectiveDeadline.equals(context.getDeadline())
// If the channel has been terminated, we don't need to schedule an extra task.
&& deadlineCancellationExecutor != null) {
deadlineCancellationFuture = startDeadlineTimer(effectiveDeadline);
&& deadlineCancellationExecutor != null
// if already expired deadline let failing stream handle
&& !(stream instanceof FailingClientStream)) {
deadlineCancellationNotifyApplicationFuture =
startDeadlineNotifyApplicationTimer(effectiveDeadline, observer);
}
if (cancelListenersShouldBeRemoved) {
// Race detected! ClientStreamListener.closed may have been called before
@ -333,46 +333,98 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
private void removeContextListenerAndCancelDeadlineFuture() {
context.removeListener(cancellationListener);
ScheduledFuture<?> f = deadlineCancellationFuture;
ScheduledFuture<?> f = deadlineCancellationSendToServerFuture;
if (f != null) {
f.cancel(false);
}
f = deadlineCancellationNotifyApplicationFuture;
if (f != null) {
f.cancel(false);
}
}
private class DeadlineTimer implements Runnable {
private final long remainingNanos;
private ScheduledFuture<?> startDeadlineNotifyApplicationTimer(Deadline deadline,
final Listener<RespT> observer) {
final long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
DeadlineTimer(long remainingNanos) {
this.remainingNanos = remainingNanos;
}
@Override
public void run() {
InsightBuilder insight = new InsightBuilder();
stream.appendTimeoutInsight(insight);
// DelayedStream.cancel() is safe to call from a thread that is different from where the
// stream is created.
long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);
StringBuilder buf = new StringBuilder();
buf.append("deadline exceeded after ");
if (remainingNanos < 0) {
buf.append('-');
class DeadlineExceededNotifyApplicationTimer implements Runnable {
@Override
public void run() {
Status status = buildDeadlineExceededStatusWithRemainingNanos(remainingNanos);
delayedCancelOnDeadlineExceeded(status, observer);
}
buf.append(seconds);
buf.append(String.format(".%09d", nanos));
buf.append("s. ");
buf.append(insight);
stream.cancel(DEADLINE_EXCEEDED.augmentDescription(buf.toString()));
}
return deadlineCancellationExecutor.schedule(
new LogExceptionRunnable(new DeadlineExceededNotifyApplicationTimer()),
remainingNanos,
TimeUnit.NANOSECONDS);
}
private ScheduledFuture<?> startDeadlineTimer(Deadline deadline) {
long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
return deadlineCancellationExecutor.schedule(
new LogExceptionRunnable(
new DeadlineTimer(remainingNanos)), remainingNanos, TimeUnit.NANOSECONDS);
private Status buildDeadlineExceededStatusWithRemainingNanos(long remainingNanos) {
final InsightBuilder insight = new InsightBuilder();
stream.appendTimeoutInsight(insight);
long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);
StringBuilder buf = new StringBuilder();
buf.append("deadline exceeded after ");
if (remainingNanos < 0) {
buf.append('-');
}
buf.append(seconds);
buf.append(String.format(".%09d", nanos));
buf.append("s. ");
buf.append(insight);
return DEADLINE_EXCEEDED.augmentDescription(buf.toString());
}
private void delayedCancelOnDeadlineExceeded(final Status status, Listener<RespT> observer) {
if (deadlineCancellationSendToServerFuture != null) {
return;
}
class DeadlineExceededSendCancelToServerTimer implements Runnable {
@Override
public void run() {
// DelayedStream.cancel() is safe to call from a thread that is different from where the
// stream is created.
stream.cancel(status);
}
}
// This races with removeContextListenerAndCancelDeadlineFuture(). Since calling cancel() on a
// stream multiple time is safe, the race here is fine.
deadlineCancellationSendToServerFuture = deadlineCancellationExecutor.schedule(
new LogExceptionRunnable(new DeadlineExceededSendCancelToServerTimer()),
DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS,
TimeUnit.NANOSECONDS);
executeCloseObserverInContext(observer, status);
}
private void executeCloseObserverInContext(final Listener<RespT> observer, final Status status) {
class CloseInContext extends ContextRunnable {
CloseInContext() {
super(context);
}
@Override
public void runInContext() {
closeObserver(observer, status, new Metadata());
}
}
callExecutor.execute(new CloseInContext());
}
private void closeObserver(Listener<RespT> observer, Status status, Metadata trailers) {
if (!observerClosed) {
observerClosed = true;
observer.onClose(status, trailers);
}
}
@Nullable
@ -517,10 +569,6 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
return Attributes.EMPTY;
}
private void closeObserver(Listener<RespT> observer, Status status, Metadata trailers) {
observer.onClose(status, trailers);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("method", method).toString();

View File

@ -17,6 +17,7 @@
package io.grpc.internal;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.internal.ClientCallImpl.DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS;
import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITTER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -54,6 +55,7 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.internal.ClientCallImpl.ClientTransportProvider;
import io.grpc.internal.testing.SingleMessageProducer;
import io.grpc.testing.TestMethodDescriptors;
@ -131,6 +133,9 @@ public class ClientCallImplTest {
@Captor
private ArgumentCaptor<Status> statusArgumentCaptor;
@Captor
private ArgumentCaptor<Metadata> metadataArgumentCaptor;
private CallOptions baseCallOptions;
@Before
@ -165,7 +170,7 @@ public class ClientCallImplTest {
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
/* retryEnabled= */ false);
call.start(callListener, new Metadata());
verify(stream).start(listenerArgumentCaptor.capture());
final ClientStreamListener streamListener = listenerArgumentCaptor.getValue();
@ -187,7 +192,7 @@ public class ClientCallImplTest {
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
/* retryEnabled= */ false);
call.start(callListener, new Metadata());
verify(stream).start(listenerArgumentCaptor.capture());
final ClientStreamListener streamListener = listenerArgumentCaptor.getValue();
@ -225,7 +230,7 @@ public class ClientCallImplTest {
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
/* retryEnabled= */ false);
call.start(callListener, new Metadata());
verify(stream).start(listenerArgumentCaptor.capture());
final ClientStreamListener streamListener = listenerArgumentCaptor.getValue();
@ -261,7 +266,7 @@ public class ClientCallImplTest {
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
/* retryEnabled= */ false);
call.start(callListener, new Metadata());
verify(stream).start(listenerArgumentCaptor.capture());
final ClientStreamListener streamListener = listenerArgumentCaptor.getValue();
@ -296,7 +301,7 @@ public class ClientCallImplTest {
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */)
/* retryEnabled= */ false)
.setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata());
@ -320,7 +325,7 @@ public class ClientCallImplTest {
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */)
/* retryEnabled= */ false)
.setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata());
@ -337,7 +342,7 @@ public class ClientCallImplTest {
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */)
/* retryEnabled= */ false)
.setDecompressorRegistry(decompressorRegistry);
final Metadata metadata = new Metadata();
@ -356,7 +361,7 @@ public class ClientCallImplTest {
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */)
/* retryEnabled= */ false)
.setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata());
@ -527,7 +532,7 @@ public class ClientCallImplTest {
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */)
/* retryEnabled= */ false)
.setDecompressorRegistry(decompressorRegistry);
context.detach(previous);
@ -605,7 +610,7 @@ public class ClientCallImplTest {
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */)
/* retryEnabled= */ false)
.setDecompressorRegistry(decompressorRegistry);
cancellableContext.detach(previous);
@ -635,7 +640,7 @@ public class ClientCallImplTest {
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */)
/* retryEnabled= */ false)
.setDecompressorRegistry(decompressorRegistry);
cancellableContext.detach(previous);
@ -680,7 +685,7 @@ public class ClientCallImplTest {
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */)
/* retryEnabled= */ false)
.setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata());
verify(transport, times(0))
@ -705,7 +710,7 @@ public class ClientCallImplTest {
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
/* retryEnabled= */ false);
call.start(callListener, new Metadata());
context.detach(origContext);
@ -730,7 +735,7 @@ public class ClientCallImplTest {
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
/* retryEnabled= */ false);
call.start(callListener, new Metadata());
context.detach(origContext);
@ -755,7 +760,7 @@ public class ClientCallImplTest {
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
/* retryEnabled= */ false);
call.start(callListener, new Metadata());
context.detach(origContext);
@ -776,7 +781,7 @@ public class ClientCallImplTest {
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
/* retryEnabled= */ false);
call.start(callListener, new Metadata());
ArgumentCaptor<Deadline> deadlineCaptor = ArgumentCaptor.forClass(Deadline.class);
@ -794,7 +799,7 @@ public class ClientCallImplTest {
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
/* retryEnabled= */ false);
call.start(callListener, new Metadata());
verify(stream, never()).setDeadline(any(Deadline.class));
@ -808,17 +813,30 @@ public class ClientCallImplTest {
ClientCallImpl<Void, Void> call = new ClientCallImpl<>(
method,
MoreExecutors.directExecutor(),
baseCallOptions.withDeadline(Deadline.after(1, TimeUnit.SECONDS)),
baseCallOptions.withDeadline(
Deadline.after(1, TimeUnit.SECONDS, fakeClock.getDeadlineTicker())),
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
/* retryEnabled= */ false);
call.start(callListener, new Metadata());
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1) + 1);
fakeClock.forwardTime(1000, TimeUnit.MILLISECONDS);
verify(stream, times(1)).cancel(statusCaptor.capture());
// Verify cancel sent to application when deadline just past
verify(callListener).onClose(statusCaptor.capture(), metadataArgumentCaptor.capture());
assertThat(statusCaptor.getValue().getDescription())
.matches("deadline exceeded after [0-9]+\\.[0-9]+s. \\[remote_addr=127\\.0\\.0\\.1:443\\]");
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED);
verify(stream, never()).cancel(statusCaptor.capture());
fakeClock.forwardNanos(DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS - 1);
verify(stream, never()).cancel(any(Status.class));
// verify cancel send to server is delayed with DEADLINE_EXPIRATION_CANCEL_DELAY
fakeClock.forwardNanos(1);
verify(stream).cancel(statusCaptor.capture());
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
assertThat(statusCaptor.getValue().getDescription())
.matches("deadline exceeded after [0-9]+\\.[0-9]+s. \\[remote_addr=127\\.0\\.0\\.1:443\\]");
@ -828,8 +846,8 @@ public class ClientCallImplTest {
public void expiredDeadlineCancelsStream_Context() {
fakeClock.forwardTime(System.nanoTime(), TimeUnit.NANOSECONDS);
Context context = Context.current()
.withDeadlineAfter(1, TimeUnit.SECONDS, deadlineCancellationExecutor);
Deadline deadline = Deadline.after(1, TimeUnit.SECONDS, fakeClock.getDeadlineTicker());
Context context = Context.current().withDeadline(deadline, deadlineCancellationExecutor);
Context origContext = context.attach();
ClientCallImpl<Void, Void> call = new ClientCallImpl<>(
@ -839,15 +857,22 @@ public class ClientCallImplTest {
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
/* retryEnabled= */ false);
context.detach(origContext);
call.start(callListener, new Metadata());
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1) + 1);
fakeClock.forwardTime(1000, TimeUnit.MILLISECONDS);
verify(stream, never()).cancel(statusCaptor.capture());
// verify app is notified.
verify(callListener).onClose(statusCaptor.capture(), metadataArgumentCaptor.capture());
assertThat(statusCaptor.getValue().getDescription()).contains("context timed out");
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED);
verify(stream, times(1)).cancel(statusCaptor.capture());
// verify cancel send to server is delayed with DEADLINE_EXPIRATION_CANCEL_DELAY
fakeClock.forwardNanos(DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS);
verify(stream).cancel(statusCaptor.capture());
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
assertThat(statusCaptor.getValue().getDescription()).isEqualTo("context timed out");
}
@ -863,7 +888,7 @@ public class ClientCallImplTest {
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
/* retryEnabled= */ false);
call.start(callListener, new Metadata());
call.cancel("canceled", null);
@ -888,7 +913,7 @@ public class ClientCallImplTest {
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
/* retryEnabled= */ false);
Metadata headers = new Metadata();
@ -906,7 +931,7 @@ public class ClientCallImplTest {
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
/* retryEnabled= */ false);
final Exception cause = new Exception();
ClientCall.Listener<Void> callListener =
new ClientCall.Listener<Void>() {
@ -944,7 +969,7 @@ public class ClientCallImplTest {
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */)
/* retryEnabled= */ false)
.setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata());
@ -957,7 +982,7 @@ public class ClientCallImplTest {
public void getAttributes() {
ClientCallImpl<Void, Void> call = new ClientCallImpl<>(
method, MoreExecutors.directExecutor(), baseCallOptions, provider,
deadlineCancellationExecutor, channelCallTracer, false /* retryEnabled */);
deadlineCancellationExecutor, channelCallTracer, /* retryEnabled= */ false);
Attributes attrs =
Attributes.newBuilder().set(Key.<String>create("fake key"), "fake value").build();
when(stream.getAttributes()).thenReturn(attrs);

View File

@ -3477,7 +3477,7 @@ public class ManagedChannelImplTest {
CallOptions.DEFAULT.withDeadlineAfter(5, TimeUnit.SECONDS));
ListenableFuture<Void> future2 = ClientCalls.futureUnaryCall(call2, null);
timer.forwardTime(1234, TimeUnit.SECONDS);
timer.forwardTime(5, TimeUnit.SECONDS);
executor.runDueTasks();
try {
@ -3488,6 +3488,9 @@ public class ManagedChannelImplTest {
}
mychannel.shutdownNow();
// Now for Deadline_exceeded, stream shutdown is delayed, calling shutdownNow() on a open stream
// will add a task to executor. Cleaning that task here.
executor.runDueTasks();
}
@Deprecated