mirror of https://github.com/grpc/grpc-java.git
core: Provide DEADLINE_EXCEEDED insights for context deadline
We provided extra details when the RPC is killed by CallOptions' Deadline, but didn't do the same for Context. To avoid duplicating code, things were restructured, including the threading. There are more code flows now, but I think the multi-threading came out more obvious and less error-prone. I didn't change the status when the deadline is already expired, because the text is shared with DelayedClientCall and AbstractInteropTest doesn't distinguish between the two cases. As seen at b/300991330
This commit is contained in:
parent
fafd99db52
commit
0e31ac9303
|
|
@ -28,7 +28,6 @@ import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
|
||||||
import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
|
import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
|
||||||
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
|
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
|
||||||
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
|
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
|
||||||
import static java.lang.Math.max;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.MoreObjects;
|
import com.google.common.base.MoreObjects;
|
||||||
|
|
@ -62,6 +61,7 @@ import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.logging.Level;
|
import java.util.logging.Level;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
|
@ -82,16 +82,13 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
private final boolean callExecutorIsDirect;
|
private final boolean callExecutorIsDirect;
|
||||||
private final CallTracer channelCallsTracer;
|
private final CallTracer channelCallsTracer;
|
||||||
private final Context context;
|
private final Context context;
|
||||||
private volatile ScheduledFuture<?> deadlineCancellationFuture;
|
private CancellationHandler cancellationHandler;
|
||||||
private final boolean unaryRequest;
|
private final boolean unaryRequest;
|
||||||
private CallOptions callOptions;
|
private CallOptions callOptions;
|
||||||
private ClientStream stream;
|
private ClientStream stream;
|
||||||
private volatile boolean cancelListenersShouldBeRemoved;
|
|
||||||
private boolean cancelCalled;
|
private boolean cancelCalled;
|
||||||
private boolean halfCloseCalled;
|
private boolean halfCloseCalled;
|
||||||
private final ClientStreamProvider clientStreamProvider;
|
private final ClientStreamProvider clientStreamProvider;
|
||||||
private final ContextCancellationListener cancellationListener =
|
|
||||||
new ContextCancellationListener();
|
|
||||||
private final ScheduledExecutorService deadlineCancellationExecutor;
|
private final ScheduledExecutorService deadlineCancellationExecutor;
|
||||||
private boolean fullStreamDecompression;
|
private boolean fullStreamDecompression;
|
||||||
private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
|
private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
|
||||||
|
|
@ -128,13 +125,6 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
PerfMark.event("ClientCall.<init>", tag);
|
PerfMark.event("ClientCall.<init>", tag);
|
||||||
}
|
}
|
||||||
|
|
||||||
private final class ContextCancellationListener implements CancellationListener {
|
|
||||||
@Override
|
|
||||||
public void cancelled(Context context) {
|
|
||||||
stream.cancel(statusFromCancelled(context));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provider of {@link ClientStream}s.
|
* Provider of {@link ClientStream}s.
|
||||||
*/
|
*/
|
||||||
|
|
@ -252,21 +242,21 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
prepareHeaders(headers, decompressorRegistry, compressor, fullStreamDecompression);
|
prepareHeaders(headers, decompressorRegistry, compressor, fullStreamDecompression);
|
||||||
|
|
||||||
Deadline effectiveDeadline = effectiveDeadline();
|
Deadline effectiveDeadline = effectiveDeadline();
|
||||||
boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired();
|
boolean contextIsDeadlineSource = effectiveDeadline != null
|
||||||
|
&& effectiveDeadline.equals(context.getDeadline());
|
||||||
|
cancellationHandler = new CancellationHandler(effectiveDeadline, contextIsDeadlineSource);
|
||||||
|
boolean deadlineExceeded = effectiveDeadline != null && cancellationHandler.remainingNanos <= 0;
|
||||||
if (!deadlineExceeded) {
|
if (!deadlineExceeded) {
|
||||||
logIfContextNarrowedTimeout(
|
|
||||||
effectiveDeadline, context.getDeadline(), callOptions.getDeadline());
|
|
||||||
stream = clientStreamProvider.newStream(method, callOptions, headers, context);
|
stream = clientStreamProvider.newStream(method, callOptions, headers, context);
|
||||||
} else {
|
} else {
|
||||||
ClientStreamTracer[] tracers =
|
ClientStreamTracer[] tracers =
|
||||||
GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false);
|
GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false);
|
||||||
String deadlineName =
|
String deadlineName = contextIsDeadlineSource ? "Context" : "CallOptions";
|
||||||
isFirstMin(callOptions.getDeadline(), context.getDeadline()) ? "CallOptions" : "Context";
|
|
||||||
Long nameResolutionDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
|
Long nameResolutionDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
|
||||||
String description = String.format(
|
String description = String.format(
|
||||||
"ClientCall started after %s deadline was exceeded %.9f seconds ago. "
|
"ClientCall started after %s deadline was exceeded %.9f seconds ago. "
|
||||||
+ "Name resolution delay %.9f seconds.", deadlineName,
|
+ "Name resolution delay %.9f seconds.", deadlineName,
|
||||||
effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS) / NANO_TO_SECS,
|
cancellationHandler.remainingNanos / NANO_TO_SECS,
|
||||||
nameResolutionDelay == null ? 0 : nameResolutionDelay / NANO_TO_SECS);
|
nameResolutionDelay == null ? 0 : nameResolutionDelay / NANO_TO_SECS);
|
||||||
stream = new FailingClientStream(DEADLINE_EXCEEDED.withDescription(description), tracers);
|
stream = new FailingClientStream(DEADLINE_EXCEEDED.withDescription(description), tracers);
|
||||||
}
|
}
|
||||||
|
|
@ -298,21 +288,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
// they receive cancel before start. Issue #1343 has more details
|
// they receive cancel before start. Issue #1343 has more details
|
||||||
|
|
||||||
// Propagate later Context cancellation to the remote side.
|
// Propagate later Context cancellation to the remote side.
|
||||||
context.addListener(cancellationListener, directExecutor());
|
cancellationHandler.setUp();
|
||||||
if (effectiveDeadline != null
|
|
||||||
// If the context has the effective deadline, we don't need to schedule an extra task.
|
|
||||||
&& !effectiveDeadline.equals(context.getDeadline())
|
|
||||||
// If the channel has been terminated, we don't need to schedule an extra task.
|
|
||||||
&& deadlineCancellationExecutor != null) {
|
|
||||||
deadlineCancellationFuture = startDeadlineTimer(effectiveDeadline);
|
|
||||||
}
|
|
||||||
if (cancelListenersShouldBeRemoved) {
|
|
||||||
// Race detected! ClientStreamListener.closed may have been called before
|
|
||||||
// deadlineCancellationFuture was set / context listener added, thereby preventing the future
|
|
||||||
// and listener from being cancelled. Go ahead and cancel again, just to be sure it
|
|
||||||
// was cancelled.
|
|
||||||
removeContextListenerAndCancelDeadlineFuture();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void applyMethodConfig() {
|
private void applyMethodConfig() {
|
||||||
|
|
@ -354,53 +330,76 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void logIfContextNarrowedTimeout(
|
private final class CancellationHandler implements Runnable, CancellationListener {
|
||||||
Deadline effectiveDeadline, @Nullable Deadline outerCallDeadline,
|
private final boolean contextIsDeadlineSource;
|
||||||
@Nullable Deadline callDeadline) {
|
private final boolean hasDeadline;
|
||||||
if (!log.isLoggable(Level.FINE) || effectiveDeadline == null
|
private final long remainingNanos;
|
||||||
|| !effectiveDeadline.equals(outerCallDeadline)) {
|
private volatile ScheduledFuture<?> deadlineCancellationFuture;
|
||||||
|
private volatile boolean tearDownCalled;
|
||||||
|
|
||||||
|
CancellationHandler(Deadline deadline, boolean contextIsDeadlineSource) {
|
||||||
|
this.contextIsDeadlineSource = contextIsDeadlineSource;
|
||||||
|
if (deadline == null) {
|
||||||
|
hasDeadline = false;
|
||||||
|
remainingNanos = 0;
|
||||||
|
} else {
|
||||||
|
hasDeadline = true;
|
||||||
|
remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void setUp() {
|
||||||
|
if (tearDownCalled) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (hasDeadline
|
||||||
long effectiveTimeout = max(0, effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS));
|
// If the context has the effective deadline, we don't need to schedule an extra task.
|
||||||
StringBuilder builder = new StringBuilder(String.format(
|
&& !contextIsDeadlineSource
|
||||||
Locale.US,
|
// If the channel has been terminated, we don't need to schedule an extra task.
|
||||||
"Call timeout set to '%d' ns, due to context deadline.", effectiveTimeout));
|
&& deadlineCancellationExecutor != null) {
|
||||||
if (callDeadline == null) {
|
deadlineCancellationFuture = deadlineCancellationExecutor.schedule(
|
||||||
builder.append(" Explicit call timeout was not set.");
|
new LogExceptionRunnable(this), remainingNanos, TimeUnit.NANOSECONDS);
|
||||||
} else {
|
|
||||||
long callTimeout = callDeadline.timeRemaining(TimeUnit.NANOSECONDS);
|
|
||||||
builder.append(String.format(Locale.US, " Explicit call timeout was '%d' ns.", callTimeout));
|
|
||||||
}
|
}
|
||||||
|
context.addListener(this, directExecutor());
|
||||||
log.fine(builder.toString());
|
if (tearDownCalled) {
|
||||||
}
|
// Race detected! Re-run to make sure the future is cancelled and context listener removed
|
||||||
|
tearDown();
|
||||||
private void removeContextListenerAndCancelDeadlineFuture() {
|
|
||||||
context.removeListener(cancellationListener);
|
|
||||||
ScheduledFuture<?> f = deadlineCancellationFuture;
|
|
||||||
if (f != null) {
|
|
||||||
f.cancel(false);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class DeadlineTimer implements Runnable {
|
// May be called multiple times, and race with setUp()
|
||||||
private final long remainingNanos;
|
void tearDown() {
|
||||||
|
tearDownCalled = true;
|
||||||
|
ScheduledFuture<?> deadlineCancellationFuture = this.deadlineCancellationFuture;
|
||||||
|
if (deadlineCancellationFuture != null) {
|
||||||
|
deadlineCancellationFuture.cancel(false);
|
||||||
|
}
|
||||||
|
context.removeListener(this);
|
||||||
|
}
|
||||||
|
|
||||||
DeadlineTimer(long remainingNanos) {
|
@Override
|
||||||
this.remainingNanos = remainingNanos;
|
public void cancelled(Context context) {
|
||||||
|
if (hasDeadline && contextIsDeadlineSource
|
||||||
|
&& context.cancellationCause() instanceof TimeoutException) {
|
||||||
|
stream.cancel(formatDeadlineExceededStatus());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
stream.cancel(statusFromCancelled(context));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
InsightBuilder insight = new InsightBuilder();
|
stream.cancel(formatDeadlineExceededStatus());
|
||||||
stream.appendTimeoutInsight(insight);
|
}
|
||||||
|
|
||||||
|
Status formatDeadlineExceededStatus() {
|
||||||
// DelayedStream.cancel() is safe to call from a thread that is different from where the
|
// DelayedStream.cancel() is safe to call from a thread that is different from where the
|
||||||
// stream is created.
|
// stream is created.
|
||||||
long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
|
long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
|
||||||
long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);
|
long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);
|
||||||
|
|
||||||
StringBuilder buf = new StringBuilder();
|
StringBuilder buf = new StringBuilder();
|
||||||
|
buf.append(contextIsDeadlineSource ? "Context" : "CallOptions");
|
||||||
buf.append(" deadline exceeded after ");
|
buf.append(" deadline exceeded after ");
|
||||||
if (remainingNanos < 0) {
|
if (remainingNanos < 0) {
|
||||||
buf.append('-');
|
buf.append('-');
|
||||||
|
|
@ -411,16 +410,14 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
Long nsDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
|
Long nsDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
|
||||||
buf.append(String.format(Locale.US, "Name resolution delay %.9f seconds.",
|
buf.append(String.format(Locale.US, "Name resolution delay %.9f seconds.",
|
||||||
nsDelay == null ? 0 : nsDelay / NANO_TO_SECS));
|
nsDelay == null ? 0 : nsDelay / NANO_TO_SECS));
|
||||||
|
if (stream != null) {
|
||||||
|
InsightBuilder insight = new InsightBuilder();
|
||||||
|
stream.appendTimeoutInsight(insight);
|
||||||
|
buf.append(" ");
|
||||||
buf.append(insight);
|
buf.append(insight);
|
||||||
stream.cancel(DEADLINE_EXCEEDED.augmentDescription(buf.toString()));
|
|
||||||
}
|
}
|
||||||
|
return DEADLINE_EXCEEDED.withDescription(buf.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
private ScheduledFuture<?> startDeadlineTimer(Deadline deadline) {
|
|
||||||
long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
|
|
||||||
return deadlineCancellationExecutor.schedule(
|
|
||||||
new LogExceptionRunnable(
|
|
||||||
new DeadlineTimer(remainingNanos)), remainingNanos, TimeUnit.NANOSECONDS);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
|
|
@ -440,16 +437,6 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
return deadline0.minimum(deadline1);
|
return deadline0.minimum(deadline1);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean isFirstMin(@Nullable Deadline deadline0, @Nullable Deadline deadline1) {
|
|
||||||
if (deadline0 == null) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (deadline1 == null) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return deadline0.isBefore(deadline1);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void request(int numMessages) {
|
public void request(int numMessages) {
|
||||||
try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.request")) {
|
try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.request")) {
|
||||||
|
|
@ -493,7 +480,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
stream.cancel(status);
|
stream.cancel(status);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
removeContextListenerAndCancelDeadlineFuture();
|
cancellationHandler.tearDown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -699,10 +686,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
// description. Since our timer may be delayed in firing, we double-check the deadline and
|
// description. Since our timer may be delayed in firing, we double-check the deadline and
|
||||||
// turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
|
// turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
|
||||||
if (deadline.isExpired()) {
|
if (deadline.isExpired()) {
|
||||||
InsightBuilder insight = new InsightBuilder();
|
status = cancellationHandler.formatDeadlineExceededStatus();
|
||||||
stream.appendTimeoutInsight(insight);
|
|
||||||
status = DEADLINE_EXCEEDED.augmentDescription(
|
|
||||||
"ClientCall was cancelled at or after deadline. " + insight);
|
|
||||||
// Replace trailers to prevent mixing sources of status and trailers.
|
// Replace trailers to prevent mixing sources of status and trailers.
|
||||||
trailers = new Metadata();
|
trailers = new Metadata();
|
||||||
}
|
}
|
||||||
|
|
@ -725,6 +709,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void runInternal() {
|
private void runInternal() {
|
||||||
|
cancellationHandler.tearDown();
|
||||||
Status status = savedStatus;
|
Status status = savedStatus;
|
||||||
Metadata trailers = savedTrailers;
|
Metadata trailers = savedTrailers;
|
||||||
if (exceptionStatus != null) {
|
if (exceptionStatus != null) {
|
||||||
|
|
@ -737,11 +722,9 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
// Replace trailers to prevent mixing sources of status and trailers.
|
// Replace trailers to prevent mixing sources of status and trailers.
|
||||||
trailers = new Metadata();
|
trailers = new Metadata();
|
||||||
}
|
}
|
||||||
cancelListenersShouldBeRemoved = true;
|
|
||||||
try {
|
try {
|
||||||
closeObserver(observer, status, trailers);
|
closeObserver(observer, status, trailers);
|
||||||
} finally {
|
} finally {
|
||||||
removeContextListenerAndCancelDeadlineFuture();
|
|
||||||
channelCallsTracer.reportCallEnded(status.isOk());
|
channelCallsTracer.reportCallEnded(status.isOk());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -926,7 +926,7 @@ public class ClientCallImplTest {
|
||||||
verify(stream, times(1)).cancel(statusCaptor.capture());
|
verify(stream, times(1)).cancel(statusCaptor.capture());
|
||||||
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
|
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
|
||||||
assertThat(statusCaptor.getValue().getDescription())
|
assertThat(statusCaptor.getValue().getDescription())
|
||||||
.matches("deadline exceeded after [0-9]+\\.[0-9]+s. "
|
.matches("CallOptions deadline exceeded after [0-9]+\\.[0-9]+s. "
|
||||||
+ "Name resolution delay 0.000000000 seconds. \\[remote_addr=127\\.0\\.0\\.1:443\\]");
|
+ "Name resolution delay 0.000000000 seconds. \\[remote_addr=127\\.0\\.0\\.1:443\\]");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -954,7 +954,9 @@ public class ClientCallImplTest {
|
||||||
|
|
||||||
verify(stream, times(1)).cancel(statusCaptor.capture());
|
verify(stream, times(1)).cancel(statusCaptor.capture());
|
||||||
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
|
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
|
||||||
assertThat(statusCaptor.getValue().getDescription()).isEqualTo("context timed out");
|
assertThat(statusCaptor.getValue().getDescription())
|
||||||
|
.matches("Context deadline exceeded after [0-9]+\\.[0-9]+s. "
|
||||||
|
+ "Name resolution delay 0.000000000 seconds. \\[remote_addr=127\\.0\\.0\\.1:443\\]");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
||||||
|
|
@ -1190,7 +1190,7 @@ public abstract class AbstractInteropTest {
|
||||||
assertTrue(desc,
|
assertTrue(desc,
|
||||||
// There is a race between client and server-side deadline expiration.
|
// There is a race between client and server-side deadline expiration.
|
||||||
// If client expires first, it'd generate this message
|
// If client expires first, it'd generate this message
|
||||||
Pattern.matches("deadline exceeded after .*s. \\[.*\\]", desc)
|
Pattern.matches("CallOptions deadline exceeded after .*s. \\[.*\\]", desc)
|
||||||
// If server expires first, it'd reset the stream and client would generate a different
|
// If server expires first, it'd reset the stream and client would generate a different
|
||||||
// message
|
// message
|
||||||
|| desc.startsWith("ClientCall was cancelled at or after deadline."));
|
|| desc.startsWith("ClientCall was cancelled at or after deadline."));
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue