From 63d82746615a5a168e8a6057ed8f61cf90648995 Mon Sep 17 00:00:00 2001 From: Jakob Buchgraber Date: Thu, 16 Jun 2016 16:02:17 +0200 Subject: [PATCH] core: don't create a new context for each client call. Fixes #1926 Also, undo the hack that makes sure AsyncClient's stack doesn't overflow as it's no longer needed. --- .../io/grpc/benchmarks/qps/AsyncClient.java | 14 +--- .../java/io/grpc/internal/ClientCallImpl.java | 74 ++++++++++++------- .../io/grpc/internal/ClientCallImplTest.java | 63 ++++++++++++++++ 3 files changed, 116 insertions(+), 35 deletions(-) diff --git a/benchmarks/src/main/java/io/grpc/benchmarks/qps/AsyncClient.java b/benchmarks/src/main/java/io/grpc/benchmarks/qps/AsyncClient.java index 6cefafbec6..d02a06bd5e 100644 --- a/benchmarks/src/main/java/io/grpc/benchmarks/qps/AsyncClient.java +++ b/benchmarks/src/main/java/io/grpc/benchmarks/qps/AsyncClient.java @@ -54,7 +54,6 @@ import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import io.grpc.Channel; -import io.grpc.Context; import io.grpc.ManagedChannel; import io.grpc.Status; import io.grpc.benchmarks.proto.BenchmarkServiceGrpc; @@ -199,15 +198,10 @@ public class AsyncClient { histogram.recordValue((now - lastCall) / 1000); lastCall = now; - Context prevCtx = Context.ROOT.attach(); - try { - if (endTime > now) { - stub.unaryCall(request, this); - } else { - future.done(); - } - } finally { - Context.current().detach(prevCtx); + if (endTime > now) { + stub.unaryCall(request, this); + } else { + future.done(); } } }); diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index f21611de2e..366d0f7b30 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -63,6 +63,7 @@ import java.io.InputStream; import java.util.concurrent.CancellationException; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -79,12 +80,12 @@ final class ClientCallImpl extends ClientCall private final MethodDescriptor method; private final Executor callExecutor; - private final Context parentContext; - private volatile Context context; + private final Context context; + private volatile ScheduledFuture deadlineCancellationFuture; private final boolean unaryRequest; private final CallOptions callOptions; private ClientStream stream; - private volatile boolean contextListenerShouldBeRemoved; + private volatile boolean cancelListenersShouldBeRemoved; private boolean cancelCalled; private boolean halfCloseCalled; private final ClientTransportProvider clientTransportProvider; @@ -103,7 +104,7 @@ final class ClientCallImpl extends ClientCall ? new SerializeReentrantCallsDirectExecutor() : new SerializingExecutor(executor); // Propagate the context from the thread which initiated the call to all callbacks. - this.parentContext = Context.current(); + this.context = Context.current(); this.unaryRequest = method.getType() == MethodType.UNARY || method.getType() == MethodType.SERVER_STREAMING; this.callOptions = callOptions; @@ -157,14 +158,6 @@ final class ClientCallImpl extends ClientCall checkNotNull(observer, "observer"); checkNotNull(headers, "headers"); - // Create the context - final Deadline effectiveDeadline = min(callOptions.getDeadline(), parentContext.getDeadline()); - if (effectiveDeadline != parentContext.getDeadline()) { - context = parentContext.withDeadline(effectiveDeadline, deadlineCancellationExecutor); - } else { - context = parentContext.withCancellation(); - } - if (context.isCancelled()) { // Context is already cancelled so no need to create a real stream, just notify the observer // of cancellation via callback on the executor @@ -200,10 +193,11 @@ final class ClientCallImpl extends ClientCall prepareHeaders(headers, decompressorRegistry, compressor); - final boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired(); + Deadline effectiveDeadline = effectiveDeadline(); + boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired(); if (!deadlineExceeded) { updateTimeoutHeaders(effectiveDeadline, callOptions.getDeadline(), - parentContext.getDeadline(), headers); + context.getDeadline(), headers); ClientTransport transport = clientTransportProvider.get(callOptions); stream = transport.newStream(method, headers, callOptions); } else { @@ -214,7 +208,6 @@ final class ClientCallImpl extends ClientCall stream.setAuthority(callOptions.getAuthority()); } stream.setCompressor(compressor); - stream.start(new ClientStreamListenerImpl(observer)); // Delay any sources of cancellation after start(), because most of the transports are broken if @@ -222,11 +215,17 @@ final class ClientCallImpl extends ClientCall // Propagate later Context cancellation to the remote side. context.addListener(this, directExecutor()); - if (contextListenerShouldBeRemoved) { + if (effectiveDeadline != null + // If the context has the effective deadline, we don't need to schedule an extra task. + && context.getDeadline() != effectiveDeadline) { + deadlineCancellationFuture = startDeadlineTimer(effectiveDeadline); + } + if (cancelListenersShouldBeRemoved) { // Race detected! ClientStreamListener.closed may have been called before - // deadlineCancellationFuture was set, thereby preventing the future from being cancelled. - // Go ahead and cancel again, just to be sure it was cancelled. - context.removeListener(this); + // deadlineCancellationFuture was set / context listener added, thereby preventing the future + // and listener from being cancelled. Go ahead and cancel again, just to be sure it + // was cancelled. + removeContextListenerAndCancelDeadlineFuture(); } } @@ -268,6 +267,33 @@ final class ClientCallImpl extends ClientCall log.info(builder.toString()); } + private void removeContextListenerAndCancelDeadlineFuture() { + context.removeListener(this); + ScheduledFuture f = deadlineCancellationFuture; + if (f != null) { + f.cancel(false); + } + } + + private ScheduledFuture startDeadlineTimer(Deadline deadline) { + return deadlineCancellationExecutor.schedule(new LogExceptionRunnable( + new Runnable() { + @Override + public void run() { + // DelayedStream.cancel() is safe to call from a thread that is different from where the + // stream is created. + stream.cancel(DEADLINE_EXCEEDED); + } + }), deadline.timeRemaining(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); + } + + @Nullable + private Deadline effectiveDeadline() { + // Call options and context are immutable, so we don't need to cache the deadline. + return min(callOptions.getDeadline(), context.getDeadline()); + } + + @Nullable private static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline deadline1) { if (deadline0 == null) { return deadline1; @@ -311,9 +337,7 @@ final class ClientCallImpl extends ClientCall stream.cancel(status); } } finally { - if (context != null) { - context.removeListener(ClientCallImpl.this); - } + removeContextListenerAndCancelDeadlineFuture(); } } @@ -422,7 +446,7 @@ final class ClientCallImpl extends ClientCall @Override public void closed(Status status, Metadata trailers) { - Deadline deadline = context.getDeadline(); + Deadline deadline = effectiveDeadline(); if (status.getCode() == Status.Code.CANCELLED && deadline != null) { // When the server's deadline expires, it can only reset the stream with CANCEL and no // description. Since our timer may be delayed in firing, we double-check the deadline and @@ -440,10 +464,10 @@ final class ClientCallImpl extends ClientCall public final void runInContext() { try { closed = true; - contextListenerShouldBeRemoved = true; + cancelListenersShouldBeRemoved = true; observer.onClose(savedStatus, savedTrailers); } finally { - context.removeListener(ClientCallImpl.this); + removeContextListenerAndCancelDeadlineFuture(); } } }); diff --git a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java index 656cdfe3d0..8a73fb0b78 100644 --- a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java @@ -58,6 +58,7 @@ import io.grpc.CallOptions; import io.grpc.ClientCall; import io.grpc.Codec; import io.grpc.Context; +import io.grpc.Deadline; import io.grpc.Decompressor; import io.grpc.DecompressorRegistry; import io.grpc.Metadata; @@ -535,6 +536,68 @@ public class ClientCallImplTest { assertTimeoutBetween(timeout, callOptsNanos - deltaNanos, callOptsNanos); } + @Test + public void expiredDeadlineCancelsStream_CallOptions() { + fakeClock.forwardTime(System.nanoTime(), TimeUnit.NANOSECONDS); + ClientCallImpl call = new ClientCallImpl( + DESCRIPTOR, + MoreExecutors.directExecutor(), + CallOptions.DEFAULT.withDeadline(Deadline.after(1000, TimeUnit.MILLISECONDS)), + provider, + deadlineCancellationExecutor); + + call.start(callListener, new Metadata()); + + fakeClock.forwardMillis(1001); + + verify(stream, times(1)).cancel(statusCaptor.capture()); + assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode()); + } + + @Test + public void expiredDeadlineCancelsStream_Context() { + fakeClock.forwardTime(System.nanoTime(), TimeUnit.NANOSECONDS); + + Context.current() + .withDeadlineAfter(1000, TimeUnit.MILLISECONDS, deadlineCancellationExecutor) + .attach(); + + ClientCallImpl call = new ClientCallImpl( + DESCRIPTOR, + MoreExecutors.directExecutor(), + CallOptions.DEFAULT, + provider, + deadlineCancellationExecutor); + + call.start(callListener, new Metadata()); + + fakeClock.forwardMillis(TimeUnit.SECONDS.toMillis(1001)); + + verify(stream, times(1)).cancel(statusCaptor.capture()); + assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode()); + } + + @Test + public void streamCancelAbortsDeadlineTimer() { + fakeClock.forwardTime(System.nanoTime(), TimeUnit.NANOSECONDS); + + ClientCallImpl call = new ClientCallImpl( + DESCRIPTOR, + MoreExecutors.directExecutor(), + CallOptions.DEFAULT.withDeadline(Deadline.after(1000, TimeUnit.MILLISECONDS)), + provider, + deadlineCancellationExecutor); + call.start(callListener, new Metadata()); + call.cancel("canceled", null); + + // Run the deadline timer, which should have been cancelled by the previous call to cancel() + fakeClock.forwardMillis(1001); + + verify(stream, times(1)).cancel(statusCaptor.capture()); + + assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode()); + } + /** * Without a context or call options deadline, * a timeout should not be set in metadata.