mirror of https://github.com/grpc/grpc-java.git
Revert "core: Provide DEADLINE_EXCEEDED insights for context deadline" (#11024)
This reverts commit 0e31ac9303.
This commit is contained in:
parent
38f968fafb
commit
68eb639b1c
|
|
@ -28,6 +28,7 @@ import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
|
||||||
import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
|
import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
|
||||||
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
|
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
|
||||||
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
|
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
|
||||||
|
import static java.lang.Math.max;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.MoreObjects;
|
import com.google.common.base.MoreObjects;
|
||||||
|
|
@ -61,7 +62,6 @@ import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
|
||||||
import java.util.logging.Level;
|
import java.util.logging.Level;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
|
@ -82,13 +82,16 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
private final boolean callExecutorIsDirect;
|
private final boolean callExecutorIsDirect;
|
||||||
private final CallTracer channelCallsTracer;
|
private final CallTracer channelCallsTracer;
|
||||||
private final Context context;
|
private final Context context;
|
||||||
private CancellationHandler cancellationHandler;
|
private volatile ScheduledFuture<?> deadlineCancellationFuture;
|
||||||
private final boolean unaryRequest;
|
private final boolean unaryRequest;
|
||||||
private CallOptions callOptions;
|
private CallOptions callOptions;
|
||||||
private ClientStream stream;
|
private ClientStream stream;
|
||||||
|
private volatile boolean cancelListenersShouldBeRemoved;
|
||||||
private boolean cancelCalled;
|
private boolean cancelCalled;
|
||||||
private boolean halfCloseCalled;
|
private boolean halfCloseCalled;
|
||||||
private final ClientStreamProvider clientStreamProvider;
|
private final ClientStreamProvider clientStreamProvider;
|
||||||
|
private final ContextCancellationListener cancellationListener =
|
||||||
|
new ContextCancellationListener();
|
||||||
private final ScheduledExecutorService deadlineCancellationExecutor;
|
private final ScheduledExecutorService deadlineCancellationExecutor;
|
||||||
private boolean fullStreamDecompression;
|
private boolean fullStreamDecompression;
|
||||||
private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
|
private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
|
||||||
|
|
@ -125,6 +128,13 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
PerfMark.event("ClientCall.<init>", tag);
|
PerfMark.event("ClientCall.<init>", tag);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private final class ContextCancellationListener implements CancellationListener {
|
||||||
|
@Override
|
||||||
|
public void cancelled(Context context) {
|
||||||
|
stream.cancel(statusFromCancelled(context));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provider of {@link ClientStream}s.
|
* Provider of {@link ClientStream}s.
|
||||||
*/
|
*/
|
||||||
|
|
@ -242,21 +252,21 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
prepareHeaders(headers, decompressorRegistry, compressor, fullStreamDecompression);
|
prepareHeaders(headers, decompressorRegistry, compressor, fullStreamDecompression);
|
||||||
|
|
||||||
Deadline effectiveDeadline = effectiveDeadline();
|
Deadline effectiveDeadline = effectiveDeadline();
|
||||||
boolean contextIsDeadlineSource = effectiveDeadline != null
|
boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired();
|
||||||
&& effectiveDeadline.equals(context.getDeadline());
|
|
||||||
cancellationHandler = new CancellationHandler(effectiveDeadline, contextIsDeadlineSource);
|
|
||||||
boolean deadlineExceeded = effectiveDeadline != null && cancellationHandler.remainingNanos <= 0;
|
|
||||||
if (!deadlineExceeded) {
|
if (!deadlineExceeded) {
|
||||||
|
logIfContextNarrowedTimeout(
|
||||||
|
effectiveDeadline, context.getDeadline(), callOptions.getDeadline());
|
||||||
stream = clientStreamProvider.newStream(method, callOptions, headers, context);
|
stream = clientStreamProvider.newStream(method, callOptions, headers, context);
|
||||||
} else {
|
} else {
|
||||||
ClientStreamTracer[] tracers =
|
ClientStreamTracer[] tracers =
|
||||||
GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false);
|
GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false);
|
||||||
String deadlineName = contextIsDeadlineSource ? "Context" : "CallOptions";
|
String deadlineName =
|
||||||
|
isFirstMin(callOptions.getDeadline(), context.getDeadline()) ? "CallOptions" : "Context";
|
||||||
Long nameResolutionDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
|
Long nameResolutionDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
|
||||||
String description = String.format(
|
String description = String.format(
|
||||||
"ClientCall started after %s deadline was exceeded %.9f seconds ago. "
|
"ClientCall started after %s deadline was exceeded %.9f seconds ago. "
|
||||||
+ "Name resolution delay %.9f seconds.", deadlineName,
|
+ "Name resolution delay %.9f seconds.", deadlineName,
|
||||||
cancellationHandler.remainingNanos / NANO_TO_SECS,
|
effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS) / NANO_TO_SECS,
|
||||||
nameResolutionDelay == null ? 0 : nameResolutionDelay / NANO_TO_SECS);
|
nameResolutionDelay == null ? 0 : nameResolutionDelay / NANO_TO_SECS);
|
||||||
stream = new FailingClientStream(DEADLINE_EXCEEDED.withDescription(description), tracers);
|
stream = new FailingClientStream(DEADLINE_EXCEEDED.withDescription(description), tracers);
|
||||||
}
|
}
|
||||||
|
|
@ -288,7 +298,21 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
// they receive cancel before start. Issue #1343 has more details
|
// they receive cancel before start. Issue #1343 has more details
|
||||||
|
|
||||||
// Propagate later Context cancellation to the remote side.
|
// Propagate later Context cancellation to the remote side.
|
||||||
cancellationHandler.setUp();
|
context.addListener(cancellationListener, directExecutor());
|
||||||
|
if (effectiveDeadline != null
|
||||||
|
// If the context has the effective deadline, we don't need to schedule an extra task.
|
||||||
|
&& !effectiveDeadline.equals(context.getDeadline())
|
||||||
|
// If the channel has been terminated, we don't need to schedule an extra task.
|
||||||
|
&& deadlineCancellationExecutor != null) {
|
||||||
|
deadlineCancellationFuture = startDeadlineTimer(effectiveDeadline);
|
||||||
|
}
|
||||||
|
if (cancelListenersShouldBeRemoved) {
|
||||||
|
// Race detected! ClientStreamListener.closed may have been called before
|
||||||
|
// deadlineCancellationFuture was set / context listener added, thereby preventing the future
|
||||||
|
// and listener from being cancelled. Go ahead and cancel again, just to be sure it
|
||||||
|
// was cancelled.
|
||||||
|
removeContextListenerAndCancelDeadlineFuture();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void applyMethodConfig() {
|
private void applyMethodConfig() {
|
||||||
|
|
@ -330,76 +354,53 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private final class CancellationHandler implements Runnable, CancellationListener {
|
private static void logIfContextNarrowedTimeout(
|
||||||
private final boolean contextIsDeadlineSource;
|
Deadline effectiveDeadline, @Nullable Deadline outerCallDeadline,
|
||||||
private final boolean hasDeadline;
|
@Nullable Deadline callDeadline) {
|
||||||
private final long remainingNanos;
|
if (!log.isLoggable(Level.FINE) || effectiveDeadline == null
|
||||||
private volatile ScheduledFuture<?> deadlineCancellationFuture;
|
|| !effectiveDeadline.equals(outerCallDeadline)) {
|
||||||
private volatile boolean tearDownCalled;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
CancellationHandler(Deadline deadline, boolean contextIsDeadlineSource) {
|
long effectiveTimeout = max(0, effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS));
|
||||||
this.contextIsDeadlineSource = contextIsDeadlineSource;
|
StringBuilder builder = new StringBuilder(String.format(
|
||||||
if (deadline == null) {
|
Locale.US,
|
||||||
hasDeadline = false;
|
"Call timeout set to '%d' ns, due to context deadline.", effectiveTimeout));
|
||||||
remainingNanos = 0;
|
if (callDeadline == null) {
|
||||||
|
builder.append(" Explicit call timeout was not set.");
|
||||||
} else {
|
} else {
|
||||||
hasDeadline = true;
|
long callTimeout = callDeadline.timeRemaining(TimeUnit.NANOSECONDS);
|
||||||
remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
|
builder.append(String.format(Locale.US, " Explicit call timeout was '%d' ns.", callTimeout));
|
||||||
|
}
|
||||||
|
|
||||||
|
log.fine(builder.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void removeContextListenerAndCancelDeadlineFuture() {
|
||||||
|
context.removeListener(cancellationListener);
|
||||||
|
ScheduledFuture<?> f = deadlineCancellationFuture;
|
||||||
|
if (f != null) {
|
||||||
|
f.cancel(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void setUp() {
|
private class DeadlineTimer implements Runnable {
|
||||||
if (tearDownCalled) {
|
private final long remainingNanos;
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (hasDeadline
|
|
||||||
// If the context has the effective deadline, we don't need to schedule an extra task.
|
|
||||||
&& !contextIsDeadlineSource
|
|
||||||
// If the channel has been terminated, we don't need to schedule an extra task.
|
|
||||||
&& deadlineCancellationExecutor != null) {
|
|
||||||
deadlineCancellationFuture = deadlineCancellationExecutor.schedule(
|
|
||||||
new LogExceptionRunnable(this), remainingNanos, TimeUnit.NANOSECONDS);
|
|
||||||
}
|
|
||||||
context.addListener(this, directExecutor());
|
|
||||||
if (tearDownCalled) {
|
|
||||||
// Race detected! Re-run to make sure the future is cancelled and context listener removed
|
|
||||||
tearDown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// May be called multiple times, and race with setUp()
|
DeadlineTimer(long remainingNanos) {
|
||||||
void tearDown() {
|
this.remainingNanos = remainingNanos;
|
||||||
tearDownCalled = true;
|
|
||||||
ScheduledFuture<?> deadlineCancellationFuture = this.deadlineCancellationFuture;
|
|
||||||
if (deadlineCancellationFuture != null) {
|
|
||||||
deadlineCancellationFuture.cancel(false);
|
|
||||||
}
|
|
||||||
context.removeListener(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void cancelled(Context context) {
|
|
||||||
if (hasDeadline && contextIsDeadlineSource
|
|
||||||
&& context.cancellationCause() instanceof TimeoutException) {
|
|
||||||
stream.cancel(formatDeadlineExceededStatus());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
stream.cancel(statusFromCancelled(context));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
stream.cancel(formatDeadlineExceededStatus());
|
InsightBuilder insight = new InsightBuilder();
|
||||||
}
|
stream.appendTimeoutInsight(insight);
|
||||||
|
|
||||||
Status formatDeadlineExceededStatus() {
|
|
||||||
// DelayedStream.cancel() is safe to call from a thread that is different from where the
|
// DelayedStream.cancel() is safe to call from a thread that is different from where the
|
||||||
// stream is created.
|
// stream is created.
|
||||||
long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
|
long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
|
||||||
long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);
|
long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);
|
||||||
|
|
||||||
StringBuilder buf = new StringBuilder();
|
StringBuilder buf = new StringBuilder();
|
||||||
buf.append(contextIsDeadlineSource ? "Context" : "CallOptions");
|
|
||||||
buf.append("deadline exceeded after ");
|
buf.append("deadline exceeded after ");
|
||||||
if (remainingNanos < 0) {
|
if (remainingNanos < 0) {
|
||||||
buf.append('-');
|
buf.append('-');
|
||||||
|
|
@ -410,14 +411,16 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
Long nsDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
|
Long nsDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
|
||||||
buf.append(String.format(Locale.US, "Name resolution delay %.9f seconds. ",
|
buf.append(String.format(Locale.US, "Name resolution delay %.9f seconds. ",
|
||||||
nsDelay == null ? 0 : nsDelay / NANO_TO_SECS));
|
nsDelay == null ? 0 : nsDelay / NANO_TO_SECS));
|
||||||
if (stream != null) {
|
|
||||||
InsightBuilder insight = new InsightBuilder();
|
|
||||||
stream.appendTimeoutInsight(insight);
|
|
||||||
buf.append(" ");
|
|
||||||
buf.append(insight);
|
buf.append(insight);
|
||||||
|
stream.cancel(DEADLINE_EXCEEDED.augmentDescription(buf.toString()));
|
||||||
}
|
}
|
||||||
return DEADLINE_EXCEEDED.withDescription(buf.toString());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ScheduledFuture<?> startDeadlineTimer(Deadline deadline) {
|
||||||
|
long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
|
||||||
|
return deadlineCancellationExecutor.schedule(
|
||||||
|
new LogExceptionRunnable(
|
||||||
|
new DeadlineTimer(remainingNanos)), remainingNanos, TimeUnit.NANOSECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
|
|
@ -437,6 +440,16 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
return deadline0.minimum(deadline1);
|
return deadline0.minimum(deadline1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static boolean isFirstMin(@Nullable Deadline deadline0, @Nullable Deadline deadline1) {
|
||||||
|
if (deadline0 == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (deadline1 == null) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return deadline0.isBefore(deadline1);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void request(int numMessages) {
|
public void request(int numMessages) {
|
||||||
try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.request")) {
|
try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.request")) {
|
||||||
|
|
@ -480,7 +493,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
stream.cancel(status);
|
stream.cancel(status);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
cancellationHandler.tearDown();
|
removeContextListenerAndCancelDeadlineFuture();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -686,7 +699,10 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
// description. Since our timer may be delayed in firing, we double-check the deadline and
|
// description. Since our timer may be delayed in firing, we double-check the deadline and
|
||||||
// turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
|
// turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
|
||||||
if (deadline.isExpired()) {
|
if (deadline.isExpired()) {
|
||||||
status = cancellationHandler.formatDeadlineExceededStatus();
|
InsightBuilder insight = new InsightBuilder();
|
||||||
|
stream.appendTimeoutInsight(insight);
|
||||||
|
status = DEADLINE_EXCEEDED.augmentDescription(
|
||||||
|
"ClientCall was cancelled at or after deadline. " + insight);
|
||||||
// Replace trailers to prevent mixing sources of status and trailers.
|
// Replace trailers to prevent mixing sources of status and trailers.
|
||||||
trailers = new Metadata();
|
trailers = new Metadata();
|
||||||
}
|
}
|
||||||
|
|
@ -709,7 +725,6 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void runInternal() {
|
private void runInternal() {
|
||||||
cancellationHandler.tearDown();
|
|
||||||
Status status = savedStatus;
|
Status status = savedStatus;
|
||||||
Metadata trailers = savedTrailers;
|
Metadata trailers = savedTrailers;
|
||||||
if (exceptionStatus != null) {
|
if (exceptionStatus != null) {
|
||||||
|
|
@ -722,9 +737,11 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
// Replace trailers to prevent mixing sources of status and trailers.
|
// Replace trailers to prevent mixing sources of status and trailers.
|
||||||
trailers = new Metadata();
|
trailers = new Metadata();
|
||||||
}
|
}
|
||||||
|
cancelListenersShouldBeRemoved = true;
|
||||||
try {
|
try {
|
||||||
closeObserver(observer, status, trailers);
|
closeObserver(observer, status, trailers);
|
||||||
} finally {
|
} finally {
|
||||||
|
removeContextListenerAndCancelDeadlineFuture();
|
||||||
channelCallsTracer.reportCallEnded(status.isOk());
|
channelCallsTracer.reportCallEnded(status.isOk());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -926,7 +926,7 @@ public class ClientCallImplTest {
|
||||||
verify(stream, times(1)).cancel(statusCaptor.capture());
|
verify(stream, times(1)).cancel(statusCaptor.capture());
|
||||||
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
|
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
|
||||||
assertThat(statusCaptor.getValue().getDescription())
|
assertThat(statusCaptor.getValue().getDescription())
|
||||||
.matches("CallOptions deadline exceeded after [0-9]+\\.[0-9]+s. "
|
.matches("deadline exceeded after [0-9]+\\.[0-9]+s. "
|
||||||
+ "Name resolution delay 0.000000000 seconds. \\[remote_addr=127\\.0\\.0\\.1:443\\]");
|
+ "Name resolution delay 0.000000000 seconds. \\[remote_addr=127\\.0\\.0\\.1:443\\]");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -954,9 +954,7 @@ public class ClientCallImplTest {
|
||||||
|
|
||||||
verify(stream, times(1)).cancel(statusCaptor.capture());
|
verify(stream, times(1)).cancel(statusCaptor.capture());
|
||||||
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
|
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
|
||||||
assertThat(statusCaptor.getValue().getDescription())
|
assertThat(statusCaptor.getValue().getDescription()).isEqualTo("context timed out");
|
||||||
.matches("Context deadline exceeded after [0-9]+\\.[0-9]+s. "
|
|
||||||
+ "Name resolution delay 0.000000000 seconds. \\[remote_addr=127\\.0\\.0\\.1:443\\]");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
||||||
|
|
@ -1190,7 +1190,7 @@ public abstract class AbstractInteropTest {
|
||||||
assertTrue(desc,
|
assertTrue(desc,
|
||||||
// There is a race between client and server-side deadline expiration.
|
// There is a race between client and server-side deadline expiration.
|
||||||
// If client expires first, it'd generate this message
|
// If client expires first, it'd generate this message
|
||||||
Pattern.matches("CallOptions deadline exceeded after .*s. \\[.*\\]", desc)
|
Pattern.matches("deadline exceeded after .*s. \\[.*\\]", desc)
|
||||||
// If server expires first, it'd reset the stream and client would generate a different
|
// If server expires first, it'd reset the stream and client would generate a different
|
||||||
// message
|
// message
|
||||||
|| desc.startsWith("ClientCall was cancelled at or after deadline."));
|
|| desc.startsWith("ClientCall was cancelled at or after deadline."));
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue