Optimize blocking calls to avoid app thread pool

This reduces the necessary number of threads in the application executor
and provides a small improvement in latency (~15μs, which is normally in
the noise, but would be a 5% improvement).

Benchmark                         (direct)  (transport)  Mode  Cnt       Score        Error  Units
Before:
TransportBenchmark.unaryCall1024      true    INPROCESS  avgt   10    1566.168 ±     13.677  ns/op
TransportBenchmark.unaryCall1024     false    INPROCESS  avgt   10   35769.532 ±   2358.967  ns/op
After:
TransportBenchmark.unaryCall1024      true    INPROCESS  avgt   10    1813.778 ±     19.995  ns/op
TransportBenchmark.unaryCall1024     false    INPROCESS  avgt   10   18568.223 ±   1679.306  ns/op

The benchmark results are exactly what we would expect, assuming that
half of the benefit of direct is on server and half on client:
1566 + (35769 - 1566) / 2 = 18668 ns --vs-- 18568 ns

It is expected that direct=true would get worse, because
SerializingExecutor is now used instead of
SerializeReentrantCallsDirectExecutor plus the additional cost of
ThreadlessExecutor.

In the future we could try to detect the ThreadlessExecutor and ellide
Serializ*Executor completely (as is possible for any single-threaded
executor). We could also optimize the queue used in ThreadlessExecutor
to be single-producer, single-consumer. I don't expect to do those
optimizations soon, however.
This commit is contained in:
Eric Anderson 2015-08-04 16:37:00 -07:00
parent f59e04f310
commit 4168f67e38
11 changed files with 120 additions and 22 deletions

View File

@ -126,7 +126,7 @@ public class TestServiceGrpc {
@java.lang.Override @java.lang.Override
public io.grpc.testing.SimpleResponse unaryCall(io.grpc.testing.SimpleRequest request) { public io.grpc.testing.SimpleResponse unaryCall(io.grpc.testing.SimpleRequest request) {
return blockingUnaryCall( return blockingUnaryCall(
getChannel().newCall(METHOD_UNARY_CALL, getCallOptions()), request); getChannel(), METHOD_UNARY_CALL, getCallOptions(), request);
} }
} }

View File

@ -129,6 +129,7 @@ public class TransportBenchmark {
if (direct) { if (direct) {
serverBuilder.directExecutor(); serverBuilder.directExecutor();
// Because blocking stubs avoid the executor, this doesn't do much.
channelBuilder.directExecutor(); channelBuilder.directExecutor();
} }

View File

@ -407,7 +407,7 @@ static void PrintStub(
p->Print( p->Print(
*vars, *vars,
"return $calls_method$(\n" "return $calls_method$(\n"
" getChannel().newCall($method_field_name$, getCallOptions()), $params$);\n"); " getChannel(), $method_field_name$, getCallOptions(), $params$);\n");
break; break;
case ASYNC_CALL: case ASYNC_CALL:
if (server_streaming) { if (server_streaming) {

View File

@ -186,14 +186,14 @@ public class TestServiceGrpc {
@java.lang.Override @java.lang.Override
public io.grpc.testing.integration.Test.SimpleResponse unaryCall(io.grpc.testing.integration.Test.SimpleRequest request) { public io.grpc.testing.integration.Test.SimpleResponse unaryCall(io.grpc.testing.integration.Test.SimpleRequest request) {
return blockingUnaryCall( return blockingUnaryCall(
getChannel().newCall(METHOD_UNARY_CALL, getCallOptions()), request); getChannel(), METHOD_UNARY_CALL, getCallOptions(), request);
} }
@java.lang.Override @java.lang.Override
public java.util.Iterator<io.grpc.testing.integration.Test.StreamingOutputCallResponse> streamingOutputCall( public java.util.Iterator<io.grpc.testing.integration.Test.StreamingOutputCallResponse> streamingOutputCall(
io.grpc.testing.integration.Test.StreamingOutputCallRequest request) { io.grpc.testing.integration.Test.StreamingOutputCallRequest request) {
return blockingServerStreamingCall( return blockingServerStreamingCall(
getChannel().newCall(METHOD_STREAMING_OUTPUT_CALL, getCallOptions()), request); getChannel(), METHOD_STREAMING_OUTPUT_CALL, getCallOptions(), request);
} }
} }

View File

@ -264,14 +264,14 @@ public class TestServiceGrpc {
@java.lang.Override @java.lang.Override
public io.grpc.testing.integration.nano.Test.SimpleResponse unaryCall(io.grpc.testing.integration.nano.Test.SimpleRequest request) { public io.grpc.testing.integration.nano.Test.SimpleResponse unaryCall(io.grpc.testing.integration.nano.Test.SimpleRequest request) {
return blockingUnaryCall( return blockingUnaryCall(
getChannel().newCall(METHOD_UNARY_CALL, getCallOptions()), request); getChannel(), METHOD_UNARY_CALL, getCallOptions(), request);
} }
@java.lang.Override @java.lang.Override
public java.util.Iterator<io.grpc.testing.integration.nano.Test.StreamingOutputCallResponse> streamingOutputCall( public java.util.Iterator<io.grpc.testing.integration.nano.Test.StreamingOutputCallResponse> streamingOutputCall(
io.grpc.testing.integration.nano.Test.StreamingOutputCallRequest request) { io.grpc.testing.integration.nano.Test.StreamingOutputCallRequest request) {
return blockingServerStreamingCall( return blockingServerStreamingCall(
getChannel().newCall(METHOD_STREAMING_OUTPUT_CALL, getCallOptions()), request); getChannel(), METHOD_STREAMING_OUTPUT_CALL, getCallOptions(), request);
} }
} }

View File

@ -107,7 +107,7 @@ public class GreeterGrpc {
@java.lang.Override @java.lang.Override
public io.grpc.examples.helloworld.HelloResponse sayHello(io.grpc.examples.helloworld.HelloRequest request) { public io.grpc.examples.helloworld.HelloResponse sayHello(io.grpc.examples.helloworld.HelloRequest request) {
return blockingUnaryCall( return blockingUnaryCall(
getChannel().newCall(METHOD_SAY_HELLO, getCallOptions()), request); getChannel(), METHOD_SAY_HELLO, getCallOptions(), request);
} }
} }

View File

@ -167,14 +167,14 @@ public class RouteGuideGrpc {
@java.lang.Override @java.lang.Override
public io.grpc.examples.routeguide.Feature getFeature(io.grpc.examples.routeguide.Point request) { public io.grpc.examples.routeguide.Feature getFeature(io.grpc.examples.routeguide.Point request) {
return blockingUnaryCall( return blockingUnaryCall(
getChannel().newCall(METHOD_GET_FEATURE, getCallOptions()), request); getChannel(), METHOD_GET_FEATURE, getCallOptions(), request);
} }
@java.lang.Override @java.lang.Override
public java.util.Iterator<io.grpc.examples.routeguide.Feature> listFeatures( public java.util.Iterator<io.grpc.examples.routeguide.Feature> listFeatures(
io.grpc.examples.routeguide.Rectangle request) { io.grpc.examples.routeguide.Rectangle request) {
return blockingServerStreamingCall( return blockingServerStreamingCall(
getChannel().newCall(METHOD_LIST_FEATURES, getCallOptions()), request); getChannel(), METHOD_LIST_FEATURES, getCallOptions(), request);
} }
} }

View File

@ -131,13 +131,13 @@ public class ReconnectServiceGrpc {
@java.lang.Override @java.lang.Override
public com.google.protobuf.EmptyProtos.Empty start(com.google.protobuf.EmptyProtos.Empty request) { public com.google.protobuf.EmptyProtos.Empty start(com.google.protobuf.EmptyProtos.Empty request) {
return blockingUnaryCall( return blockingUnaryCall(
getChannel().newCall(METHOD_START, getCallOptions()), request); getChannel(), METHOD_START, getCallOptions(), request);
} }
@java.lang.Override @java.lang.Override
public io.grpc.testing.integration.Messages.ReconnectInfo stop(com.google.protobuf.EmptyProtos.Empty request) { public io.grpc.testing.integration.Messages.ReconnectInfo stop(com.google.protobuf.EmptyProtos.Empty request) {
return blockingUnaryCall( return blockingUnaryCall(
getChannel().newCall(METHOD_STOP, getCallOptions()), request); getChannel(), METHOD_STOP, getCallOptions(), request);
} }
} }

View File

@ -210,20 +210,20 @@ public class TestServiceGrpc {
@java.lang.Override @java.lang.Override
public com.google.protobuf.EmptyProtos.Empty emptyCall(com.google.protobuf.EmptyProtos.Empty request) { public com.google.protobuf.EmptyProtos.Empty emptyCall(com.google.protobuf.EmptyProtos.Empty request) {
return blockingUnaryCall( return blockingUnaryCall(
getChannel().newCall(METHOD_EMPTY_CALL, getCallOptions()), request); getChannel(), METHOD_EMPTY_CALL, getCallOptions(), request);
} }
@java.lang.Override @java.lang.Override
public io.grpc.testing.integration.Messages.SimpleResponse unaryCall(io.grpc.testing.integration.Messages.SimpleRequest request) { public io.grpc.testing.integration.Messages.SimpleResponse unaryCall(io.grpc.testing.integration.Messages.SimpleRequest request) {
return blockingUnaryCall( return blockingUnaryCall(
getChannel().newCall(METHOD_UNARY_CALL, getCallOptions()), request); getChannel(), METHOD_UNARY_CALL, getCallOptions(), request);
} }
@java.lang.Override @java.lang.Override
public java.util.Iterator<io.grpc.testing.integration.Messages.StreamingOutputCallResponse> streamingOutputCall( public java.util.Iterator<io.grpc.testing.integration.Messages.StreamingOutputCallResponse> streamingOutputCall(
io.grpc.testing.integration.Messages.StreamingOutputCallRequest request) { io.grpc.testing.integration.Messages.StreamingOutputCallRequest request) {
return blockingServerStreamingCall( return blockingServerStreamingCall(
getChannel().newCall(METHOD_STREAMING_OUTPUT_CALL, getCallOptions()), request); getChannel(), METHOD_STREAMING_OUTPUT_CALL, getCallOptions(), request);
} }
} }

View File

@ -107,7 +107,7 @@ public class UnimplementedServiceGrpc {
@java.lang.Override @java.lang.Override
public com.google.protobuf.EmptyProtos.Empty unimplementedCall(com.google.protobuf.EmptyProtos.Empty request) { public com.google.protobuf.EmptyProtos.Empty unimplementedCall(com.google.protobuf.EmptyProtos.Empty request) {
return blockingUnaryCall( return blockingUnaryCall(
getChannel().newCall(METHOD_UNIMPLEMENTED_CALL, getCallOptions()), request); getChannel(), METHOD_UNIMPLEMENTED_CALL, getCallOptions(), request);
} }
} }

View File

@ -36,8 +36,11 @@ import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall; import io.grpc.ClientCall;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status; import io.grpc.Status;
import java.util.Iterator; import java.util.Iterator;
@ -45,7 +48,11 @@ import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -55,6 +62,8 @@ import javax.annotation.Nullable;
* that the runtime can vary behavior without requiring regeneration of the stub. * that the runtime can vary behavior without requiring regeneration of the stub.
*/ */
public class ClientCalls { public class ClientCalls {
private static final Logger log = Logger.getLogger(ClientCalls.class.getName());
// Prevent instantiation // Prevent instantiation
private ClientCalls() {} private ClientCalls() {}
@ -110,6 +119,32 @@ public class ClientCalls {
} }
} }
/**
* Executes a unary call and blocks on the response.
*
* @return the single response message.
*/
public static <ReqT, RespT> RespT blockingUnaryCall(
Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT param) {
ThreadlessExecutor executor = new ThreadlessExecutor();
ClientCall<ReqT, RespT> call = channel.newCall(method, callOptions.withExecutor(executor));
try {
ListenableFuture<RespT> responseFuture = futureUnaryCall(call, param);
while (!responseFuture.isDone()) {
try {
executor.waitAndDrain();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
return getUnchecked(responseFuture);
} catch (Throwable t) {
call.cancel();
throw Throwables.propagate(t);
}
}
/** /**
* Executes a server-streaming call returning a blocking {@link Iterator} over the * Executes a server-streaming call returning a blocking {@link Iterator} over the
* response stream. * response stream.
@ -123,6 +158,22 @@ public class ClientCalls {
return result; return result;
} }
/**
* Executes a server-streaming call returning a blocking {@link Iterator} over the
* response stream.
*
* @return an iterator over the response stream.
*/
// TODO(louiscryan): Not clear if we want to use this idiom for 'simple' stubs.
public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT param) {
ThreadlessExecutor executor = new ThreadlessExecutor();
ClientCall<ReqT, RespT> call = channel.newCall(method, callOptions.withExecutor(executor));
BlockingResponseStream<RespT> result = new BlockingResponseStream<RespT>(call, executor);
asyncUnaryRequestCall(call, param, result.listener(), true);
return result;
}
/** /**
* Executes a unary call and returns a {@link ListenableFuture} to the response. * Executes a unary call and returns a {@link ListenableFuture} to the response.
* *
@ -345,27 +396,49 @@ public class ClientCalls {
private final BlockingQueue<Object> buffer = new ArrayBlockingQueue<Object>(2); private final BlockingQueue<Object> buffer = new ArrayBlockingQueue<Object>(2);
private final ClientCall.Listener<T> listener = new QueuingListener(); private final ClientCall.Listener<T> listener = new QueuingListener();
private final ClientCall<?, T> call; private final ClientCall<?, T> call;
/** May be null. */
private final ThreadlessExecutor threadless;
// Only accessed when iterating. // Only accessed when iterating.
private Object last; private Object last;
private BlockingResponseStream(ClientCall<?, T> call) { private BlockingResponseStream(ClientCall<?, T> call) {
this(call, null);
}
private BlockingResponseStream(ClientCall<?, T> call, ThreadlessExecutor threadless) {
this.call = call; this.call = call;
this.threadless = threadless;
} }
ClientCall.Listener<T> listener() { ClientCall.Listener<T> listener() {
return listener; return listener;
} }
private Object waitForNext() throws InterruptedException {
if (threadless == null) {
return buffer.take();
} else {
Object next = buffer.poll();
while (next == null) {
threadless.waitAndDrain();
next = buffer.poll();
}
return next;
}
}
@Override @Override
public boolean hasNext() { public boolean hasNext() {
if (last == null) {
try { try {
// Will block here indefinitely waiting for content. RPC timeouts defend against permanent // Will block here indefinitely waiting for content. RPC timeouts defend against permanent
// hangs here as the call will become closed. // hangs here as the call will become closed.
last = (last == null) ? buffer.take() : last; last = waitForNext();
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
Thread.interrupted(); Thread.interrupted();
throw new RuntimeException(ie); throw new RuntimeException(ie);
} }
}
if (last instanceof Status) { if (last instanceof Status) {
throw ((Status) last).asRuntimeException(); throw ((Status) last).asRuntimeException();
} }
@ -417,4 +490,28 @@ public class ClientCalls {
} }
} }
} }
private static class ThreadlessExecutor implements Executor {
private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
/**
* Waits until there is a Runnable, then executes it and all queued Runnables after it.
*/
public void waitAndDrain() throws InterruptedException {
Runnable runnable = queue.take();
while (runnable != null) {
try {
runnable.run();
} catch (Throwable t) {
log.log(Level.WARNING, "Runnable threw exception", t);
}
runnable = queue.poll();
}
}
@Override
public void execute(Runnable runnable) {
queue.add(runnable);
}
}
} }