core: don't create a new context for each client call. Fixes #1926

Also, undo the hack that makes sure AsyncClient's stack doesn't overflow as it's no longer needed.
This commit is contained in:
Jakob Buchgraber 2016-06-16 16:02:17 +02:00 committed by GitHub
parent 26fb347f40
commit 63d8274661
3 changed files with 116 additions and 35 deletions

View File

@ -54,7 +54,6 @@ import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import io.grpc.Channel; import io.grpc.Channel;
import io.grpc.Context;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.benchmarks.proto.BenchmarkServiceGrpc; import io.grpc.benchmarks.proto.BenchmarkServiceGrpc;
@ -199,16 +198,11 @@ public class AsyncClient {
histogram.recordValue((now - lastCall) / 1000); histogram.recordValue((now - lastCall) / 1000);
lastCall = now; lastCall = now;
Context prevCtx = Context.ROOT.attach();
try {
if (endTime > now) { if (endTime > now) {
stub.unaryCall(request, this); stub.unaryCall(request, this);
} else { } else {
future.done(); future.done();
} }
} finally {
Context.current().detach(prevCtx);
}
} }
}); });

View File

@ -63,6 +63,7 @@ import java.io.InputStream;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -79,12 +80,12 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
private final MethodDescriptor<ReqT, RespT> method; private final MethodDescriptor<ReqT, RespT> method;
private final Executor callExecutor; private final Executor callExecutor;
private final Context parentContext; private final Context context;
private volatile Context context; private volatile ScheduledFuture<?> deadlineCancellationFuture;
private final boolean unaryRequest; private final boolean unaryRequest;
private final CallOptions callOptions; private final CallOptions callOptions;
private ClientStream stream; private ClientStream stream;
private volatile boolean contextListenerShouldBeRemoved; private volatile boolean cancelListenersShouldBeRemoved;
private boolean cancelCalled; private boolean cancelCalled;
private boolean halfCloseCalled; private boolean halfCloseCalled;
private final ClientTransportProvider clientTransportProvider; private final ClientTransportProvider clientTransportProvider;
@ -103,7 +104,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
? new SerializeReentrantCallsDirectExecutor() ? new SerializeReentrantCallsDirectExecutor()
: new SerializingExecutor(executor); : new SerializingExecutor(executor);
// Propagate the context from the thread which initiated the call to all callbacks. // Propagate the context from the thread which initiated the call to all callbacks.
this.parentContext = Context.current(); this.context = Context.current();
this.unaryRequest = method.getType() == MethodType.UNARY this.unaryRequest = method.getType() == MethodType.UNARY
|| method.getType() == MethodType.SERVER_STREAMING; || method.getType() == MethodType.SERVER_STREAMING;
this.callOptions = callOptions; this.callOptions = callOptions;
@ -157,14 +158,6 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
checkNotNull(observer, "observer"); checkNotNull(observer, "observer");
checkNotNull(headers, "headers"); checkNotNull(headers, "headers");
// Create the context
final Deadline effectiveDeadline = min(callOptions.getDeadline(), parentContext.getDeadline());
if (effectiveDeadline != parentContext.getDeadline()) {
context = parentContext.withDeadline(effectiveDeadline, deadlineCancellationExecutor);
} else {
context = parentContext.withCancellation();
}
if (context.isCancelled()) { if (context.isCancelled()) {
// Context is already cancelled so no need to create a real stream, just notify the observer // Context is already cancelled so no need to create a real stream, just notify the observer
// of cancellation via callback on the executor // of cancellation via callback on the executor
@ -200,10 +193,11 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
prepareHeaders(headers, decompressorRegistry, compressor); prepareHeaders(headers, decompressorRegistry, compressor);
final boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired(); Deadline effectiveDeadline = effectiveDeadline();
boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired();
if (!deadlineExceeded) { if (!deadlineExceeded) {
updateTimeoutHeaders(effectiveDeadline, callOptions.getDeadline(), updateTimeoutHeaders(effectiveDeadline, callOptions.getDeadline(),
parentContext.getDeadline(), headers); context.getDeadline(), headers);
ClientTransport transport = clientTransportProvider.get(callOptions); ClientTransport transport = clientTransportProvider.get(callOptions);
stream = transport.newStream(method, headers, callOptions); stream = transport.newStream(method, headers, callOptions);
} else { } else {
@ -214,7 +208,6 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
stream.setAuthority(callOptions.getAuthority()); stream.setAuthority(callOptions.getAuthority());
} }
stream.setCompressor(compressor); stream.setCompressor(compressor);
stream.start(new ClientStreamListenerImpl(observer)); stream.start(new ClientStreamListenerImpl(observer));
// Delay any sources of cancellation after start(), because most of the transports are broken if // Delay any sources of cancellation after start(), because most of the transports are broken if
@ -222,11 +215,17 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
// Propagate later Context cancellation to the remote side. // Propagate later Context cancellation to the remote side.
context.addListener(this, directExecutor()); context.addListener(this, directExecutor());
if (contextListenerShouldBeRemoved) { if (effectiveDeadline != null
// If the context has the effective deadline, we don't need to schedule an extra task.
&& context.getDeadline() != effectiveDeadline) {
deadlineCancellationFuture = startDeadlineTimer(effectiveDeadline);
}
if (cancelListenersShouldBeRemoved) {
// Race detected! ClientStreamListener.closed may have been called before // Race detected! ClientStreamListener.closed may have been called before
// deadlineCancellationFuture was set, thereby preventing the future from being cancelled. // deadlineCancellationFuture was set / context listener added, thereby preventing the future
// Go ahead and cancel again, just to be sure it was cancelled. // and listener from being cancelled. Go ahead and cancel again, just to be sure it
context.removeListener(this); // was cancelled.
removeContextListenerAndCancelDeadlineFuture();
} }
} }
@ -268,6 +267,33 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
log.info(builder.toString()); log.info(builder.toString());
} }
private void removeContextListenerAndCancelDeadlineFuture() {
context.removeListener(this);
ScheduledFuture<?> f = deadlineCancellationFuture;
if (f != null) {
f.cancel(false);
}
}
private ScheduledFuture<?> startDeadlineTimer(Deadline deadline) {
return deadlineCancellationExecutor.schedule(new LogExceptionRunnable(
new Runnable() {
@Override
public void run() {
// DelayedStream.cancel() is safe to call from a thread that is different from where the
// stream is created.
stream.cancel(DEADLINE_EXCEEDED);
}
}), deadline.timeRemaining(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
}
@Nullable
private Deadline effectiveDeadline() {
// Call options and context are immutable, so we don't need to cache the deadline.
return min(callOptions.getDeadline(), context.getDeadline());
}
@Nullable
private static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline deadline1) { private static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline deadline1) {
if (deadline0 == null) { if (deadline0 == null) {
return deadline1; return deadline1;
@ -311,9 +337,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
stream.cancel(status); stream.cancel(status);
} }
} finally { } finally {
if (context != null) { removeContextListenerAndCancelDeadlineFuture();
context.removeListener(ClientCallImpl.this);
}
} }
} }
@ -422,7 +446,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
@Override @Override
public void closed(Status status, Metadata trailers) { public void closed(Status status, Metadata trailers) {
Deadline deadline = context.getDeadline(); Deadline deadline = effectiveDeadline();
if (status.getCode() == Status.Code.CANCELLED && deadline != null) { if (status.getCode() == Status.Code.CANCELLED && deadline != null) {
// When the server's deadline expires, it can only reset the stream with CANCEL and no // When the server's deadline expires, it can only reset the stream with CANCEL and no
// description. Since our timer may be delayed in firing, we double-check the deadline and // description. Since our timer may be delayed in firing, we double-check the deadline and
@ -440,10 +464,10 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
public final void runInContext() { public final void runInContext() {
try { try {
closed = true; closed = true;
contextListenerShouldBeRemoved = true; cancelListenersShouldBeRemoved = true;
observer.onClose(savedStatus, savedTrailers); observer.onClose(savedStatus, savedTrailers);
} finally { } finally {
context.removeListener(ClientCallImpl.this); removeContextListenerAndCancelDeadlineFuture();
} }
} }
}); });

View File

@ -58,6 +58,7 @@ import io.grpc.CallOptions;
import io.grpc.ClientCall; import io.grpc.ClientCall;
import io.grpc.Codec; import io.grpc.Codec;
import io.grpc.Context; import io.grpc.Context;
import io.grpc.Deadline;
import io.grpc.Decompressor; import io.grpc.Decompressor;
import io.grpc.DecompressorRegistry; import io.grpc.DecompressorRegistry;
import io.grpc.Metadata; import io.grpc.Metadata;
@ -535,6 +536,68 @@ public class ClientCallImplTest {
assertTimeoutBetween(timeout, callOptsNanos - deltaNanos, callOptsNanos); assertTimeoutBetween(timeout, callOptsNanos - deltaNanos, callOptsNanos);
} }
@Test
public void expiredDeadlineCancelsStream_CallOptions() {
fakeClock.forwardTime(System.nanoTime(), TimeUnit.NANOSECONDS);
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
DESCRIPTOR,
MoreExecutors.directExecutor(),
CallOptions.DEFAULT.withDeadline(Deadline.after(1000, TimeUnit.MILLISECONDS)),
provider,
deadlineCancellationExecutor);
call.start(callListener, new Metadata());
fakeClock.forwardMillis(1001);
verify(stream, times(1)).cancel(statusCaptor.capture());
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
}
@Test
public void expiredDeadlineCancelsStream_Context() {
fakeClock.forwardTime(System.nanoTime(), TimeUnit.NANOSECONDS);
Context.current()
.withDeadlineAfter(1000, TimeUnit.MILLISECONDS, deadlineCancellationExecutor)
.attach();
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
DESCRIPTOR,
MoreExecutors.directExecutor(),
CallOptions.DEFAULT,
provider,
deadlineCancellationExecutor);
call.start(callListener, new Metadata());
fakeClock.forwardMillis(TimeUnit.SECONDS.toMillis(1001));
verify(stream, times(1)).cancel(statusCaptor.capture());
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
}
@Test
public void streamCancelAbortsDeadlineTimer() {
fakeClock.forwardTime(System.nanoTime(), TimeUnit.NANOSECONDS);
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
DESCRIPTOR,
MoreExecutors.directExecutor(),
CallOptions.DEFAULT.withDeadline(Deadline.after(1000, TimeUnit.MILLISECONDS)),
provider,
deadlineCancellationExecutor);
call.start(callListener, new Metadata());
call.cancel("canceled", null);
// Run the deadline timer, which should have been cancelled by the previous call to cancel()
fakeClock.forwardMillis(1001);
verify(stream, times(1)).cancel(statusCaptor.capture());
assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode());
}
/** /**
* Without a context or call options deadline, * Without a context or call options deadline,
* a timeout should not be set in metadata. * a timeout should not be set in metadata.