core: populate effective deadline to ClientStream

Added `ClientStream.setDeadline(Deadline deadline)` method, which will set the timeout header.

Resolves #4412
This commit is contained in:
ZHANG Dapeng 2018-05-02 13:31:03 -07:00 committed by GitHub
parent d50c191aca
commit e19e8f7d40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 166 additions and 72 deletions

View File

@ -25,6 +25,7 @@ java_library(
deps = [
":core",
":internal",
"//context",
"@com_google_code_findbugs_jsr305//jar",
"@com_google_guava_guava//jar",
],

View File

@ -17,12 +17,15 @@
package io.grpc.inprocess;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
import static java.lang.Math.max;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Compressor;
import io.grpc.Deadline;
import io.grpc.Decompressor;
import io.grpc.DecompressorRegistry;
import io.grpc.Grpc;
@ -53,6 +56,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.CheckReturnValue;
@ -664,6 +668,13 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
@Override
public void setMaxOutboundMessageSize(int maxSize) {}
@Override
public void setDeadline(Deadline deadline) {
headers.discardAll(TIMEOUT_KEY);
long effectiveTimeout = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS));
headers.put(TIMEOUT_KEY, effectiveTimeout);
}
}
}

View File

@ -19,17 +19,21 @@ package io.grpc.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
import static java.lang.Math.max;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.Deadline;
import io.grpc.Decompressor;
import io.grpc.DecompressorRegistry;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.internal.ClientStreamListener.RpcProgress;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@ -116,6 +120,13 @@ public abstract class AbstractClientStream extends AbstractStream
}
}
@Override
public void setDeadline(Deadline deadline) {
headers.discardAll(TIMEOUT_KEY);
long effectiveTimeout = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS));
headers.put(TIMEOUT_KEY, effectiveTimeout);
}
@Override
public void setMaxOutboundMessageSize(int maxSize) {
framer.setMaxOutboundMessageSize(maxSize);

View File

@ -26,7 +26,6 @@ import static io.grpc.internal.GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
import static java.lang.Math.max;
import com.google.common.annotations.VisibleForTesting;
@ -234,8 +233,8 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
Deadline effectiveDeadline = effectiveDeadline();
boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired();
if (!deadlineExceeded) {
updateTimeoutHeaders(effectiveDeadline, callOptions.getDeadline(),
context.getDeadline(), headers);
logIfContextNarrowedTimeout(
effectiveDeadline, callOptions.getDeadline(), context.getDeadline());
if (retryEnabled) {
stream = clientTransportProvider.newRetriableStream(method, callOptions, headers, context);
} else {
@ -262,8 +261,13 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
if (callOptions.getMaxOutboundMessageSize() != null) {
stream.setMaxOutboundMessageSize(callOptions.getMaxOutboundMessageSize());
}
if (effectiveDeadline != null) {
stream.setDeadline(effectiveDeadline);
}
stream.setCompressor(compressor);
stream.setFullStreamDecompression(fullStreamDecompression);
if (fullStreamDecompression) {
stream.setFullStreamDecompression(fullStreamDecompression);
}
stream.setDecompressorRegistry(decompressorRegistry);
channelCallsTracer.reportCallStarted();
stream.start(new ClientStreamListenerImpl(observer));
@ -289,34 +293,17 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
}
}
/**
* Based on the deadline, calculate and set the timeout to the given headers.
*/
private static void updateTimeoutHeaders(@Nullable Deadline effectiveDeadline,
@Nullable Deadline callDeadline, @Nullable Deadline outerCallDeadline, Metadata headers) {
headers.discardAll(TIMEOUT_KEY);
if (effectiveDeadline == null) {
private static void logIfContextNarrowedTimeout(
Deadline effectiveDeadline, @Nullable Deadline outerCallDeadline,
@Nullable Deadline callDeadline) {
if (!log.isLoggable(Level.FINE) || effectiveDeadline == null
|| outerCallDeadline != effectiveDeadline) {
return;
}
long effectiveTimeout = max(0, effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS));
headers.put(TIMEOUT_KEY, effectiveTimeout);
logIfContextNarrowedTimeout(effectiveTimeout, effectiveDeadline, outerCallDeadline,
callDeadline);
}
private static void logIfContextNarrowedTimeout(long effectiveTimeout,
Deadline effectiveDeadline, @Nullable Deadline outerCallDeadline,
@Nullable Deadline callDeadline) {
if (!log.isLoggable(Level.FINE) || outerCallDeadline != effectiveDeadline) {
return;
}
StringBuilder builder = new StringBuilder();
builder.append(String.format("Call timeout set to '%d' ns, due to context deadline.",
effectiveTimeout));
StringBuilder builder = new StringBuilder(String.format(
"Call timeout set to '%d' ns, due to context deadline.", effectiveTimeout));
if (callDeadline == null) {
builder.append(" Explicit call timeout was not set.");
} else {

View File

@ -17,8 +17,10 @@
package io.grpc.internal;
import io.grpc.Attributes;
import io.grpc.Deadline;
import io.grpc.DecompressorRegistry;
import io.grpc.Status;
import javax.annotation.Nonnull;
/**
* Extension of {@link Stream} to support client-side termination semantics.
@ -86,6 +88,11 @@ public interface ClientStream extends Stream {
*/
void setMaxOutboundMessageSize(int maxSize);
/**
* Sets the effective deadline of the RPC.
*/
void setDeadline(@Nonnull Deadline deadline);
/**
* Attributes that the stream holds at the current moment.
*/

View File

@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.Attributes;
import io.grpc.Compressor;
import io.grpc.Deadline;
import io.grpc.DecompressorRegistry;
import io.grpc.Metadata;
import io.grpc.Status;
@ -83,6 +84,16 @@ class DelayedStream implements ClientStream {
}
}
@Override
public void setDeadline(final Deadline deadline) {
delayOrExecute(new Runnable() {
@Override
public void run() {
realStream.setDeadline(deadline);
}
});
}
/**
* Transfers all pending and future requests and mutations to the given stream.
*

View File

@ -19,6 +19,7 @@ package io.grpc.internal;
import com.google.common.base.MoreObjects;
import io.grpc.Attributes;
import io.grpc.Compressor;
import io.grpc.Deadline;
import io.grpc.DecompressorRegistry;
import io.grpc.Status;
import java.io.InputStream;
@ -96,6 +97,11 @@ abstract class ForwardingClientStream implements ClientStream {
delegate().setMaxOutboundMessageSize(maxSize);
}
@Override
public void setDeadline(Deadline deadline) {
delegate().setDeadline(deadline);
}
@Override
public Attributes getAttributes() {
return delegate().getAttributes();

View File

@ -18,9 +18,11 @@ package io.grpc.internal;
import io.grpc.Attributes;
import io.grpc.Compressor;
import io.grpc.Deadline;
import io.grpc.DecompressorRegistry;
import io.grpc.Status;
import java.io.InputStream;
import javax.annotation.Nonnull;
/**
* An implementation of {@link ClientStream} that silently does nothing for the operations.
@ -78,4 +80,7 @@ public class NoopClientStream implements ClientStream {
@Override
public void setMaxOutboundMessageSize(int maxSize) {}
@Override
public void setDeadline(@Nonnull Deadline deadline) {}
}

View File

@ -25,6 +25,7 @@ import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.ClientStreamTracer;
import io.grpc.Compressor;
import io.grpc.Deadline;
import io.grpc.DecompressorRegistry;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
@ -485,6 +486,18 @@ abstract class RetriableStream<ReqT> implements ClientStream {
delayOrExecute(new MaxOutboundMessageSizeEntry());
}
@Override
public final void setDeadline(final Deadline deadline) {
class DeadlineEntry implements BufferEntry {
@Override
public void runWith(Substream substream) {
substream.stream.setDeadline(deadline);
}
}
delayOrExecute(new DeadlineEntry());
}
@Override
public final Attributes getAttributes() {
if (state.winningSubstream != null) {

View File

@ -34,6 +34,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import io.grpc.Attributes;
import io.grpc.Codec;
import io.grpc.Deadline;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.Status.Code;
@ -44,6 +45,7 @@ import io.grpc.internal.testing.TestClientStreamTracer;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@ -380,6 +382,27 @@ public class AbstractClientStreamTest {
verify(input).close();
}
@Test
public void deadlineTimeoutPopulatedToHeaders() {
AbstractClientStream.Sink sink = mock(AbstractClientStream.Sink.class);
ClientStream stream = new BaseAbstractClientStream(
allocator, new BaseTransportState(statsTraceCtx, transportTracer), sink, statsTraceCtx,
transportTracer);
stream.setDeadline(Deadline.after(1, TimeUnit.SECONDS));
stream.start(mockListener);
ArgumentCaptor<Metadata> headersCaptor = ArgumentCaptor.forClass(Metadata.class);
verify(sink).writeHeaders(headersCaptor.capture(), any(byte[].class));
Metadata headers = headersCaptor.getValue();
assertTrue(headers.containsKey(GrpcUtil.TIMEOUT_KEY));
assertThat(headers.get(GrpcUtil.TIMEOUT_KEY).longValue())
.isLessThan(TimeUnit.SECONDS.toNanos(1));
assertThat(headers.get(GrpcUtil.TIMEOUT_KEY).longValue())
.isGreaterThan(TimeUnit.MILLISECONDS.toNanos(600));
}
/**
* No-op base class for testing.
*/

View File

@ -21,7 +21,6 @@ import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITTER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
@ -671,11 +670,10 @@ public class ClientCallImplTest {
}
@Test
public void contextDeadlineShouldBePropagatedInMetadata() {
long deadlineNanos = TimeUnit.SECONDS.toNanos(1);
Context context = Context.current().withDeadlineAfter(deadlineNanos, TimeUnit.NANOSECONDS,
deadlineCancellationExecutor);
context.attach();
public void contextDeadlineShouldBePropagatedToStream() {
Context context = Context.current()
.withDeadlineAfter(1000, TimeUnit.MILLISECONDS, deadlineCancellationExecutor);
Context origContext = context.attach();
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
@ -685,27 +683,23 @@ public class ClientCallImplTest {
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
call.start(callListener, new Metadata());
Metadata headers = new Metadata();
context.detach(origContext);
call.start(callListener, headers);
ArgumentCaptor<Deadline> deadlineCaptor = ArgumentCaptor.forClass(Deadline.class);
verify(stream).setDeadline(deadlineCaptor.capture());
assertTrue(headers.containsKey(GrpcUtil.TIMEOUT_KEY));
Long timeout = headers.get(GrpcUtil.TIMEOUT_KEY);
assertNotNull(timeout);
long deltaNanos = TimeUnit.MILLISECONDS.toNanos(400);
assertTimeoutBetween(timeout, deadlineNanos - deltaNanos, deadlineNanos);
assertTimeoutBetween(deadlineCaptor.getValue().timeRemaining(TimeUnit.MILLISECONDS), 600, 1000);
}
@Test
public void contextDeadlineShouldOverrideLargerMetadataTimeout() {
long deadlineNanos = TimeUnit.SECONDS.toNanos(1);
Context context = Context.current().withDeadlineAfter(deadlineNanos, TimeUnit.NANOSECONDS,
deadlineCancellationExecutor);
context.attach();
public void contextDeadlineShouldOverrideLargerCallOptionsDeadline() {
Context context = Context.current()
.withDeadlineAfter(1000, TimeUnit.MILLISECONDS, deadlineCancellationExecutor);
Context origContext = context.attach();
CallOptions callOpts = baseCallOptions.withDeadlineAfter(2, TimeUnit.SECONDS);
CallOptions callOpts = baseCallOptions.withDeadlineAfter(2000, TimeUnit.MILLISECONDS);
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
MoreExecutors.directExecutor(),
@ -714,27 +708,23 @@ public class ClientCallImplTest {
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
call.start(callListener, new Metadata());
Metadata headers = new Metadata();
context.detach(origContext);
call.start(callListener, headers);
ArgumentCaptor<Deadline> deadlineCaptor = ArgumentCaptor.forClass(Deadline.class);
verify(stream).setDeadline(deadlineCaptor.capture());
assertTrue(headers.containsKey(GrpcUtil.TIMEOUT_KEY));
Long timeout = headers.get(GrpcUtil.TIMEOUT_KEY);
assertNotNull(timeout);
long deltaNanos = TimeUnit.MILLISECONDS.toNanos(400);
assertTimeoutBetween(timeout, deadlineNanos - deltaNanos, deadlineNanos);
assertTimeoutBetween(deadlineCaptor.getValue().timeRemaining(TimeUnit.MILLISECONDS), 600, 1000);
}
@Test
public void contextDeadlineShouldNotOverrideSmallerMetadataTimeout() {
long deadlineNanos = TimeUnit.SECONDS.toNanos(2);
Context context = Context.current().withDeadlineAfter(deadlineNanos, TimeUnit.NANOSECONDS,
deadlineCancellationExecutor);
context.attach();
public void contextDeadlineShouldNotOverrideSmallerCallOptionsDeadline() {
Context context = Context.current()
.withDeadlineAfter(2000, TimeUnit.MILLISECONDS, deadlineCancellationExecutor);
Context origContext = context.attach();
CallOptions callOpts = baseCallOptions.withDeadlineAfter(1, TimeUnit.SECONDS);
CallOptions callOpts = baseCallOptions.withDeadlineAfter(1000, TimeUnit.MILLISECONDS);
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
MoreExecutors.directExecutor(),
@ -743,18 +733,48 @@ public class ClientCallImplTest {
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
call.start(callListener, new Metadata());
Metadata headers = new Metadata();
context.detach(origContext);
call.start(callListener, headers);
ArgumentCaptor<Deadline> deadlineCaptor = ArgumentCaptor.forClass(Deadline.class);
verify(stream).setDeadline(deadlineCaptor.capture());
assertTrue(headers.containsKey(GrpcUtil.TIMEOUT_KEY));
Long timeout = headers.get(GrpcUtil.TIMEOUT_KEY);
assertNotNull(timeout);
assertTimeoutBetween(deadlineCaptor.getValue().timeRemaining(TimeUnit.MILLISECONDS), 600, 1000);
}
long callOptsNanos = TimeUnit.SECONDS.toNanos(1);
long deltaNanos = TimeUnit.MILLISECONDS.toNanos(400);
assertTimeoutBetween(timeout, callOptsNanos - deltaNanos, callOptsNanos);
@Test
public void callOptionsDeadlineShouldBePropagatedToStream() {
CallOptions callOpts = baseCallOptions.withDeadlineAfter(1000, TimeUnit.MILLISECONDS);
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
MoreExecutors.directExecutor(),
callOpts,
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
call.start(callListener, new Metadata());
ArgumentCaptor<Deadline> deadlineCaptor = ArgumentCaptor.forClass(Deadline.class);
verify(stream).setDeadline(deadlineCaptor.capture());
assertTimeoutBetween(deadlineCaptor.getValue().timeRemaining(TimeUnit.MILLISECONDS), 600, 1000);
}
@Test
public void noDeadlineShouldBePropagatedToStream() {
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
MoreExecutors.directExecutor(),
baseCallOptions,
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */);
call.start(callListener, new Metadata());
verify(stream, never()).setDeadline(any(Deadline.class));
}
@Test
@ -941,4 +961,3 @@ public class ClientCallImplTest {
}
}
}

View File

@ -137,7 +137,7 @@ public class ForwardingClientStreamTest {
public void setMaxInboundMessageSizeTest() {
int size = 4567;
forward.setMaxInboundMessageSize(size);
verify(mock).setMaxInboundMessageSize(size);;
verify(mock).setMaxInboundMessageSize(size);
}
@Test