From e19e8f7d402d9e316a572fce3674d3b57de63c98 Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Wed, 2 May 2018 13:31:03 -0700 Subject: [PATCH] core: populate effective deadline to ClientStream Added `ClientStream.setDeadline(Deadline deadline)` method, which will set the timeout header. Resolves #4412 --- core/BUILD.bazel | 1 + .../io/grpc/inprocess/InProcessTransport.java | 11 ++ .../grpc/internal/AbstractClientStream.java | 11 ++ .../java/io/grpc/internal/ClientCallImpl.java | 43 +++---- .../java/io/grpc/internal/ClientStream.java | 7 ++ .../java/io/grpc/internal/DelayedStream.java | 11 ++ .../grpc/internal/ForwardingClientStream.java | 6 + .../io/grpc/internal/NoopClientStream.java | 5 + .../io/grpc/internal/RetriableStream.java | 13 +++ .../internal/AbstractClientStreamTest.java | 23 ++++ .../io/grpc/internal/ClientCallImplTest.java | 105 +++++++++++------- .../internal/ForwardingClientStreamTest.java | 2 +- 12 files changed, 166 insertions(+), 72 deletions(-) diff --git a/core/BUILD.bazel b/core/BUILD.bazel index 7d9800887f..0e37ad7778 100644 --- a/core/BUILD.bazel +++ b/core/BUILD.bazel @@ -25,6 +25,7 @@ java_library( deps = [ ":core", ":internal", + "//context", "@com_google_code_findbugs_jsr305//jar", "@com_google_guava_guava//jar", ], diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java index 1bd6e2eecd..d71c2315c6 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -17,12 +17,15 @@ package io.grpc.inprocess; import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY; +import static java.lang.Math.max; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import io.grpc.Attributes; import io.grpc.CallOptions; import io.grpc.Compressor; +import io.grpc.Deadline; import io.grpc.Decompressor; import io.grpc.DecompressorRegistry; import io.grpc.Grpc; @@ -53,6 +56,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.CheckReturnValue; @@ -664,6 +668,13 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans @Override public void setMaxOutboundMessageSize(int maxSize) {} + + @Override + public void setDeadline(Deadline deadline) { + headers.discardAll(TIMEOUT_KEY); + long effectiveTimeout = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS)); + headers.put(TIMEOUT_KEY, effectiveTimeout); + } } } diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java index d78402730e..ba82576df6 100644 --- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractClientStream.java @@ -19,17 +19,21 @@ package io.grpc.internal; import static com.google.common.base.Preconditions.checkNotNull; import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY; import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY; +import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY; +import static java.lang.Math.max; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import io.grpc.Codec; import io.grpc.Compressor; +import io.grpc.Deadline; import io.grpc.Decompressor; import io.grpc.DecompressorRegistry; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.internal.ClientStreamListener.RpcProgress; import java.io.InputStream; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -116,6 +120,13 @@ public abstract class AbstractClientStream extends AbstractStream } } + @Override + public void setDeadline(Deadline deadline) { + headers.discardAll(TIMEOUT_KEY); + long effectiveTimeout = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS)); + headers.put(TIMEOUT_KEY, effectiveTimeout); + } + @Override public void setMaxOutboundMessageSize(int maxSize) { framer.setMaxOutboundMessageSize(maxSize); diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index b2986b64a5..1a372a1bc8 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -26,7 +26,6 @@ import static io.grpc.internal.GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY; import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY; import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY; import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY; -import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY; import static java.lang.Math.max; import com.google.common.annotations.VisibleForTesting; @@ -234,8 +233,8 @@ final class ClientCallImpl extends ClientCall { Deadline effectiveDeadline = effectiveDeadline(); boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired(); if (!deadlineExceeded) { - updateTimeoutHeaders(effectiveDeadline, callOptions.getDeadline(), - context.getDeadline(), headers); + logIfContextNarrowedTimeout( + effectiveDeadline, callOptions.getDeadline(), context.getDeadline()); if (retryEnabled) { stream = clientTransportProvider.newRetriableStream(method, callOptions, headers, context); } else { @@ -262,8 +261,13 @@ final class ClientCallImpl extends ClientCall { if (callOptions.getMaxOutboundMessageSize() != null) { stream.setMaxOutboundMessageSize(callOptions.getMaxOutboundMessageSize()); } + if (effectiveDeadline != null) { + stream.setDeadline(effectiveDeadline); + } stream.setCompressor(compressor); - stream.setFullStreamDecompression(fullStreamDecompression); + if (fullStreamDecompression) { + stream.setFullStreamDecompression(fullStreamDecompression); + } stream.setDecompressorRegistry(decompressorRegistry); channelCallsTracer.reportCallStarted(); stream.start(new ClientStreamListenerImpl(observer)); @@ -289,34 +293,17 @@ final class ClientCallImpl extends ClientCall { } } - /** - * Based on the deadline, calculate and set the timeout to the given headers. - */ - private static void updateTimeoutHeaders(@Nullable Deadline effectiveDeadline, - @Nullable Deadline callDeadline, @Nullable Deadline outerCallDeadline, Metadata headers) { - headers.discardAll(TIMEOUT_KEY); - - if (effectiveDeadline == null) { + private static void logIfContextNarrowedTimeout( + Deadline effectiveDeadline, @Nullable Deadline outerCallDeadline, + @Nullable Deadline callDeadline) { + if (!log.isLoggable(Level.FINE) || effectiveDeadline == null + || outerCallDeadline != effectiveDeadline) { return; } long effectiveTimeout = max(0, effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS)); - headers.put(TIMEOUT_KEY, effectiveTimeout); - - logIfContextNarrowedTimeout(effectiveTimeout, effectiveDeadline, outerCallDeadline, - callDeadline); - } - - private static void logIfContextNarrowedTimeout(long effectiveTimeout, - Deadline effectiveDeadline, @Nullable Deadline outerCallDeadline, - @Nullable Deadline callDeadline) { - if (!log.isLoggable(Level.FINE) || outerCallDeadline != effectiveDeadline) { - return; - } - - StringBuilder builder = new StringBuilder(); - builder.append(String.format("Call timeout set to '%d' ns, due to context deadline.", - effectiveTimeout)); + StringBuilder builder = new StringBuilder(String.format( + "Call timeout set to '%d' ns, due to context deadline.", effectiveTimeout)); if (callDeadline == null) { builder.append(" Explicit call timeout was not set."); } else { diff --git a/core/src/main/java/io/grpc/internal/ClientStream.java b/core/src/main/java/io/grpc/internal/ClientStream.java index 217d1bfe5b..c277a3c9e2 100644 --- a/core/src/main/java/io/grpc/internal/ClientStream.java +++ b/core/src/main/java/io/grpc/internal/ClientStream.java @@ -17,8 +17,10 @@ package io.grpc.internal; import io.grpc.Attributes; +import io.grpc.Deadline; import io.grpc.DecompressorRegistry; import io.grpc.Status; +import javax.annotation.Nonnull; /** * Extension of {@link Stream} to support client-side termination semantics. @@ -86,6 +88,11 @@ public interface ClientStream extends Stream { */ void setMaxOutboundMessageSize(int maxSize); + /** + * Sets the effective deadline of the RPC. + */ + void setDeadline(@Nonnull Deadline deadline); + /** * Attributes that the stream holds at the current moment. */ diff --git a/core/src/main/java/io/grpc/internal/DelayedStream.java b/core/src/main/java/io/grpc/internal/DelayedStream.java index 81bbd22c9f..4a1e2201e8 100644 --- a/core/src/main/java/io/grpc/internal/DelayedStream.java +++ b/core/src/main/java/io/grpc/internal/DelayedStream.java @@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; import io.grpc.Attributes; import io.grpc.Compressor; +import io.grpc.Deadline; import io.grpc.DecompressorRegistry; import io.grpc.Metadata; import io.grpc.Status; @@ -83,6 +84,16 @@ class DelayedStream implements ClientStream { } } + @Override + public void setDeadline(final Deadline deadline) { + delayOrExecute(new Runnable() { + @Override + public void run() { + realStream.setDeadline(deadline); + } + }); + } + /** * Transfers all pending and future requests and mutations to the given stream. * diff --git a/core/src/main/java/io/grpc/internal/ForwardingClientStream.java b/core/src/main/java/io/grpc/internal/ForwardingClientStream.java index 7af41c8ab9..8b389ded70 100644 --- a/core/src/main/java/io/grpc/internal/ForwardingClientStream.java +++ b/core/src/main/java/io/grpc/internal/ForwardingClientStream.java @@ -19,6 +19,7 @@ package io.grpc.internal; import com.google.common.base.MoreObjects; import io.grpc.Attributes; import io.grpc.Compressor; +import io.grpc.Deadline; import io.grpc.DecompressorRegistry; import io.grpc.Status; import java.io.InputStream; @@ -96,6 +97,11 @@ abstract class ForwardingClientStream implements ClientStream { delegate().setMaxOutboundMessageSize(maxSize); } + @Override + public void setDeadline(Deadline deadline) { + delegate().setDeadline(deadline); + } + @Override public Attributes getAttributes() { return delegate().getAttributes(); diff --git a/core/src/main/java/io/grpc/internal/NoopClientStream.java b/core/src/main/java/io/grpc/internal/NoopClientStream.java index 6b7da742b8..b066db598a 100644 --- a/core/src/main/java/io/grpc/internal/NoopClientStream.java +++ b/core/src/main/java/io/grpc/internal/NoopClientStream.java @@ -18,9 +18,11 @@ package io.grpc.internal; import io.grpc.Attributes; import io.grpc.Compressor; +import io.grpc.Deadline; import io.grpc.DecompressorRegistry; import io.grpc.Status; import java.io.InputStream; +import javax.annotation.Nonnull; /** * An implementation of {@link ClientStream} that silently does nothing for the operations. @@ -78,4 +80,7 @@ public class NoopClientStream implements ClientStream { @Override public void setMaxOutboundMessageSize(int maxSize) {} + + @Override + public void setDeadline(@Nonnull Deadline deadline) {} } diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index 25742e6a40..d2ae1d666d 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -25,6 +25,7 @@ import io.grpc.Attributes; import io.grpc.CallOptions; import io.grpc.ClientStreamTracer; import io.grpc.Compressor; +import io.grpc.Deadline; import io.grpc.DecompressorRegistry; import io.grpc.Metadata; import io.grpc.MethodDescriptor; @@ -485,6 +486,18 @@ abstract class RetriableStream implements ClientStream { delayOrExecute(new MaxOutboundMessageSizeEntry()); } + @Override + public final void setDeadline(final Deadline deadline) { + class DeadlineEntry implements BufferEntry { + @Override + public void runWith(Substream substream) { + substream.stream.setDeadline(deadline); + } + } + + delayOrExecute(new DeadlineEntry()); + } + @Override public final Attributes getAttributes() { if (state.winningSubstream != null) { diff --git a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java index c6ca66ba94..d9b9edd47e 100644 --- a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java @@ -34,6 +34,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import io.grpc.Attributes; import io.grpc.Codec; +import io.grpc.Deadline; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.Status.Code; @@ -44,6 +45,7 @@ import io.grpc.internal.testing.TestClientStreamTracer; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -380,6 +382,27 @@ public class AbstractClientStreamTest { verify(input).close(); } + @Test + public void deadlineTimeoutPopulatedToHeaders() { + AbstractClientStream.Sink sink = mock(AbstractClientStream.Sink.class); + ClientStream stream = new BaseAbstractClientStream( + allocator, new BaseTransportState(statsTraceCtx, transportTracer), sink, statsTraceCtx, + transportTracer); + + stream.setDeadline(Deadline.after(1, TimeUnit.SECONDS)); + stream.start(mockListener); + + ArgumentCaptor headersCaptor = ArgumentCaptor.forClass(Metadata.class); + verify(sink).writeHeaders(headersCaptor.capture(), any(byte[].class)); + + Metadata headers = headersCaptor.getValue(); + assertTrue(headers.containsKey(GrpcUtil.TIMEOUT_KEY)); + assertThat(headers.get(GrpcUtil.TIMEOUT_KEY).longValue()) + .isLessThan(TimeUnit.SECONDS.toNanos(1)); + assertThat(headers.get(GrpcUtil.TIMEOUT_KEY).longValue()) + .isGreaterThan(TimeUnit.MILLISECONDS.toNanos(600)); + } + /** * No-op base class for testing. */ diff --git a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java index 8d1fd8abe8..f040410dee 100644 --- a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java @@ -21,7 +21,6 @@ import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITTER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; @@ -671,11 +670,10 @@ public class ClientCallImplTest { } @Test - public void contextDeadlineShouldBePropagatedInMetadata() { - long deadlineNanos = TimeUnit.SECONDS.toNanos(1); - Context context = Context.current().withDeadlineAfter(deadlineNanos, TimeUnit.NANOSECONDS, - deadlineCancellationExecutor); - context.attach(); + public void contextDeadlineShouldBePropagatedToStream() { + Context context = Context.current() + .withDeadlineAfter(1000, TimeUnit.MILLISECONDS, deadlineCancellationExecutor); + Context origContext = context.attach(); ClientCallImpl call = new ClientCallImpl( method, @@ -685,27 +683,23 @@ public class ClientCallImplTest { deadlineCancellationExecutor, channelCallTracer, false /* retryEnabled */); + call.start(callListener, new Metadata()); - Metadata headers = new Metadata(); + context.detach(origContext); - call.start(callListener, headers); + ArgumentCaptor deadlineCaptor = ArgumentCaptor.forClass(Deadline.class); + verify(stream).setDeadline(deadlineCaptor.capture()); - assertTrue(headers.containsKey(GrpcUtil.TIMEOUT_KEY)); - Long timeout = headers.get(GrpcUtil.TIMEOUT_KEY); - assertNotNull(timeout); - - long deltaNanos = TimeUnit.MILLISECONDS.toNanos(400); - assertTimeoutBetween(timeout, deadlineNanos - deltaNanos, deadlineNanos); + assertTimeoutBetween(deadlineCaptor.getValue().timeRemaining(TimeUnit.MILLISECONDS), 600, 1000); } @Test - public void contextDeadlineShouldOverrideLargerMetadataTimeout() { - long deadlineNanos = TimeUnit.SECONDS.toNanos(1); - Context context = Context.current().withDeadlineAfter(deadlineNanos, TimeUnit.NANOSECONDS, - deadlineCancellationExecutor); - context.attach(); + public void contextDeadlineShouldOverrideLargerCallOptionsDeadline() { + Context context = Context.current() + .withDeadlineAfter(1000, TimeUnit.MILLISECONDS, deadlineCancellationExecutor); + Context origContext = context.attach(); - CallOptions callOpts = baseCallOptions.withDeadlineAfter(2, TimeUnit.SECONDS); + CallOptions callOpts = baseCallOptions.withDeadlineAfter(2000, TimeUnit.MILLISECONDS); ClientCallImpl call = new ClientCallImpl( method, MoreExecutors.directExecutor(), @@ -714,27 +708,23 @@ public class ClientCallImplTest { deadlineCancellationExecutor, channelCallTracer, false /* retryEnabled */); + call.start(callListener, new Metadata()); - Metadata headers = new Metadata(); + context.detach(origContext); - call.start(callListener, headers); + ArgumentCaptor deadlineCaptor = ArgumentCaptor.forClass(Deadline.class); + verify(stream).setDeadline(deadlineCaptor.capture()); - assertTrue(headers.containsKey(GrpcUtil.TIMEOUT_KEY)); - Long timeout = headers.get(GrpcUtil.TIMEOUT_KEY); - assertNotNull(timeout); - - long deltaNanos = TimeUnit.MILLISECONDS.toNanos(400); - assertTimeoutBetween(timeout, deadlineNanos - deltaNanos, deadlineNanos); + assertTimeoutBetween(deadlineCaptor.getValue().timeRemaining(TimeUnit.MILLISECONDS), 600, 1000); } @Test - public void contextDeadlineShouldNotOverrideSmallerMetadataTimeout() { - long deadlineNanos = TimeUnit.SECONDS.toNanos(2); - Context context = Context.current().withDeadlineAfter(deadlineNanos, TimeUnit.NANOSECONDS, - deadlineCancellationExecutor); - context.attach(); + public void contextDeadlineShouldNotOverrideSmallerCallOptionsDeadline() { + Context context = Context.current() + .withDeadlineAfter(2000, TimeUnit.MILLISECONDS, deadlineCancellationExecutor); + Context origContext = context.attach(); - CallOptions callOpts = baseCallOptions.withDeadlineAfter(1, TimeUnit.SECONDS); + CallOptions callOpts = baseCallOptions.withDeadlineAfter(1000, TimeUnit.MILLISECONDS); ClientCallImpl call = new ClientCallImpl( method, MoreExecutors.directExecutor(), @@ -743,18 +733,48 @@ public class ClientCallImplTest { deadlineCancellationExecutor, channelCallTracer, false /* retryEnabled */); + call.start(callListener, new Metadata()); - Metadata headers = new Metadata(); + context.detach(origContext); - call.start(callListener, headers); + ArgumentCaptor deadlineCaptor = ArgumentCaptor.forClass(Deadline.class); + verify(stream).setDeadline(deadlineCaptor.capture()); - assertTrue(headers.containsKey(GrpcUtil.TIMEOUT_KEY)); - Long timeout = headers.get(GrpcUtil.TIMEOUT_KEY); - assertNotNull(timeout); + assertTimeoutBetween(deadlineCaptor.getValue().timeRemaining(TimeUnit.MILLISECONDS), 600, 1000); + } - long callOptsNanos = TimeUnit.SECONDS.toNanos(1); - long deltaNanos = TimeUnit.MILLISECONDS.toNanos(400); - assertTimeoutBetween(timeout, callOptsNanos - deltaNanos, callOptsNanos); + @Test + public void callOptionsDeadlineShouldBePropagatedToStream() { + CallOptions callOpts = baseCallOptions.withDeadlineAfter(1000, TimeUnit.MILLISECONDS); + ClientCallImpl call = new ClientCallImpl( + method, + MoreExecutors.directExecutor(), + callOpts, + provider, + deadlineCancellationExecutor, + channelCallTracer, + false /* retryEnabled */); + call.start(callListener, new Metadata()); + + ArgumentCaptor deadlineCaptor = ArgumentCaptor.forClass(Deadline.class); + verify(stream).setDeadline(deadlineCaptor.capture()); + + assertTimeoutBetween(deadlineCaptor.getValue().timeRemaining(TimeUnit.MILLISECONDS), 600, 1000); + } + + @Test + public void noDeadlineShouldBePropagatedToStream() { + ClientCallImpl call = new ClientCallImpl( + method, + MoreExecutors.directExecutor(), + baseCallOptions, + provider, + deadlineCancellationExecutor, + channelCallTracer, + false /* retryEnabled */); + call.start(callListener, new Metadata()); + + verify(stream, never()).setDeadline(any(Deadline.class)); } @Test @@ -941,4 +961,3 @@ public class ClientCallImplTest { } } } - diff --git a/core/src/test/java/io/grpc/internal/ForwardingClientStreamTest.java b/core/src/test/java/io/grpc/internal/ForwardingClientStreamTest.java index fca177d840..3023d2744e 100644 --- a/core/src/test/java/io/grpc/internal/ForwardingClientStreamTest.java +++ b/core/src/test/java/io/grpc/internal/ForwardingClientStreamTest.java @@ -137,7 +137,7 @@ public class ForwardingClientStreamTest { public void setMaxInboundMessageSizeTest() { int size = 4567; forward.setMaxInboundMessageSize(size); - verify(mock).setMaxInboundMessageSize(size);; + verify(mock).setMaxInboundMessageSize(size); } @Test