diff --git a/core/src/main/java/io/grpc/ChannelImpl.java b/core/src/main/java/io/grpc/ChannelImpl.java index 03387a4758..4da16f060e 100644 --- a/core/src/main/java/io/grpc/ChannelImpl.java +++ b/core/src/main/java/io/grpc/ChannelImpl.java @@ -326,7 +326,8 @@ public final class ChannelImpl extends Channel { @Override public void start(Listener observer, Metadata.Headers headers) { Preconditions.checkState(stream == null, "Already started"); - ClientStreamListener listener = new ClientStreamListenerImpl(observer); + Long deadlineNanoTime = callOptions.getDeadlineNanoTime(); + ClientStreamListener listener = new ClientStreamListenerImpl(observer, deadlineNanoTime); ClientTransport transport; try { transport = obtainActiveTransport(); @@ -343,7 +344,6 @@ public final class ChannelImpl extends Channel { headers.removeAll(TIMEOUT_KEY); // Convert the deadline to timeout. Timeout is more favorable than deadline on the wire // because timeout tolerates the clock difference between machines. - Long deadlineNanoTime = callOptions.getDeadlineNanoTime(); long timeoutMicros = 0; if (deadlineNanoTime != null) { timeoutMicros = TimeUnit.NANOSECONDS.toMicros(deadlineNanoTime - System.nanoTime()); @@ -441,11 +441,13 @@ public final class ChannelImpl extends Channel { private class ClientStreamListenerImpl implements ClientStreamListener { private final Listener observer; + private final Long deadlineNanoTime; private boolean closed; - public ClientStreamListenerImpl(Listener observer) { + public ClientStreamListenerImpl(Listener observer, Long deadlineNanoTime) { Preconditions.checkNotNull(observer); this.observer = observer; + this.deadlineNanoTime = deadlineNanoTime; } @Override @@ -491,7 +493,20 @@ public final class ChannelImpl extends Channel { } @Override - public void closed(final Status status, final Metadata.Trailers trailers) { + public void closed(Status status, Metadata.Trailers trailers) { + if (status.getCode() == Status.Code.CANCELLED && deadlineNanoTime != null) { + // When the server's deadline expires, it can only reset the stream with CANCEL and no + // description. Since our timer may be delayed in firing, we double-check the deadline and + // turn the failure into the likely more helpful DEADLINE_EXCEEDED status. This is always + // safe, but we avoid wasting resources getting the nanoTime() when unnecessary. + if (deadlineNanoTime <= System.nanoTime()) { + status = Status.DEADLINE_EXCEEDED; + // Replace trailers to prevent mixing sources of status and trailers. + trailers = new Metadata.Trailers(); + } + } + final Status savedStatus = status; + final Metadata.Trailers savedTrailers = trailers; callExecutor.execute(new Runnable() { @Override public void run() { @@ -501,7 +516,7 @@ public final class ChannelImpl extends Channel { if (future != null) { future.cancel(false); } - observer.onClose(status, trailers); + observer.onClose(savedStatus, savedTrailers); } }); }