diff --git a/benchmarks/src/generated/main/grpc/io/grpc/testing/TestServiceGrpc.java b/benchmarks/src/generated/main/grpc/io/grpc/testing/TestServiceGrpc.java index 170a63196e..45881e3f67 100644 --- a/benchmarks/src/generated/main/grpc/io/grpc/testing/TestServiceGrpc.java +++ b/benchmarks/src/generated/main/grpc/io/grpc/testing/TestServiceGrpc.java @@ -126,7 +126,7 @@ public class TestServiceGrpc { @java.lang.Override public io.grpc.testing.SimpleResponse unaryCall(io.grpc.testing.SimpleRequest request) { return blockingUnaryCall( - getChannel().newCall(METHOD_UNARY_CALL, getCallOptions()), request); + getChannel(), METHOD_UNARY_CALL, getCallOptions(), request); } } diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/TransportBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/TransportBenchmark.java index 6de0adde83..8381bf5bfb 100644 --- a/benchmarks/src/jmh/java/io/grpc/benchmarks/TransportBenchmark.java +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/TransportBenchmark.java @@ -129,6 +129,7 @@ public class TransportBenchmark { if (direct) { serverBuilder.directExecutor(); + // Because blocking stubs avoid the executor, this doesn't do much. channelBuilder.directExecutor(); } diff --git a/compiler/src/java_plugin/cpp/java_generator.cpp b/compiler/src/java_plugin/cpp/java_generator.cpp index 49a1f54276..cdb17c0d9f 100644 --- a/compiler/src/java_plugin/cpp/java_generator.cpp +++ b/compiler/src/java_plugin/cpp/java_generator.cpp @@ -407,7 +407,7 @@ static void PrintStub( p->Print( *vars, "return $calls_method$(\n" - " getChannel().newCall($method_field_name$, getCallOptions()), $params$);\n"); + " getChannel(), $method_field_name$, getCallOptions(), $params$);\n"); break; case ASYNC_CALL: if (server_streaming) { diff --git a/compiler/src/test/golden/TestService.java.txt b/compiler/src/test/golden/TestService.java.txt index 77d5bb2ccb..904963ed51 100644 --- a/compiler/src/test/golden/TestService.java.txt +++ b/compiler/src/test/golden/TestService.java.txt @@ -186,14 +186,14 @@ public class TestServiceGrpc { @java.lang.Override public io.grpc.testing.integration.Test.SimpleResponse unaryCall(io.grpc.testing.integration.Test.SimpleRequest request) { return blockingUnaryCall( - getChannel().newCall(METHOD_UNARY_CALL, getCallOptions()), request); + getChannel(), METHOD_UNARY_CALL, getCallOptions(), request); } @java.lang.Override public java.util.Iterator streamingOutputCall( io.grpc.testing.integration.Test.StreamingOutputCallRequest request) { return blockingServerStreamingCall( - getChannel().newCall(METHOD_STREAMING_OUTPUT_CALL, getCallOptions()), request); + getChannel(), METHOD_STREAMING_OUTPUT_CALL, getCallOptions(), request); } } diff --git a/compiler/src/test/golden/TestServiceNano.java.txt b/compiler/src/test/golden/TestServiceNano.java.txt index f8c1ff3943..4d8d1bac10 100644 --- a/compiler/src/test/golden/TestServiceNano.java.txt +++ b/compiler/src/test/golden/TestServiceNano.java.txt @@ -264,14 +264,14 @@ public class TestServiceGrpc { @java.lang.Override public io.grpc.testing.integration.nano.Test.SimpleResponse unaryCall(io.grpc.testing.integration.nano.Test.SimpleRequest request) { return blockingUnaryCall( - getChannel().newCall(METHOD_UNARY_CALL, getCallOptions()), request); + getChannel(), METHOD_UNARY_CALL, getCallOptions(), request); } @java.lang.Override public java.util.Iterator streamingOutputCall( io.grpc.testing.integration.nano.Test.StreamingOutputCallRequest request) { return blockingServerStreamingCall( - getChannel().newCall(METHOD_STREAMING_OUTPUT_CALL, getCallOptions()), request); + getChannel(), METHOD_STREAMING_OUTPUT_CALL, getCallOptions(), request); } } diff --git a/examples/src/generated/main/grpc/io/grpc/examples/helloworld/GreeterGrpc.java b/examples/src/generated/main/grpc/io/grpc/examples/helloworld/GreeterGrpc.java index ac400a5806..d423b2dc08 100644 --- a/examples/src/generated/main/grpc/io/grpc/examples/helloworld/GreeterGrpc.java +++ b/examples/src/generated/main/grpc/io/grpc/examples/helloworld/GreeterGrpc.java @@ -107,7 +107,7 @@ public class GreeterGrpc { @java.lang.Override public io.grpc.examples.helloworld.HelloResponse sayHello(io.grpc.examples.helloworld.HelloRequest request) { return blockingUnaryCall( - getChannel().newCall(METHOD_SAY_HELLO, getCallOptions()), request); + getChannel(), METHOD_SAY_HELLO, getCallOptions(), request); } } diff --git a/examples/src/generated/main/grpc/io/grpc/examples/routeguide/RouteGuideGrpc.java b/examples/src/generated/main/grpc/io/grpc/examples/routeguide/RouteGuideGrpc.java index 6fe7108a8a..6f732d15a1 100644 --- a/examples/src/generated/main/grpc/io/grpc/examples/routeguide/RouteGuideGrpc.java +++ b/examples/src/generated/main/grpc/io/grpc/examples/routeguide/RouteGuideGrpc.java @@ -167,14 +167,14 @@ public class RouteGuideGrpc { @java.lang.Override public io.grpc.examples.routeguide.Feature getFeature(io.grpc.examples.routeguide.Point request) { return blockingUnaryCall( - getChannel().newCall(METHOD_GET_FEATURE, getCallOptions()), request); + getChannel(), METHOD_GET_FEATURE, getCallOptions(), request); } @java.lang.Override public java.util.Iterator listFeatures( io.grpc.examples.routeguide.Rectangle request) { return blockingServerStreamingCall( - getChannel().newCall(METHOD_LIST_FEATURES, getCallOptions()), request); + getChannel(), METHOD_LIST_FEATURES, getCallOptions(), request); } } diff --git a/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/ReconnectServiceGrpc.java b/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/ReconnectServiceGrpc.java index 6a573ade98..d27309fead 100644 --- a/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/ReconnectServiceGrpc.java +++ b/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/ReconnectServiceGrpc.java @@ -131,13 +131,13 @@ public class ReconnectServiceGrpc { @java.lang.Override public com.google.protobuf.EmptyProtos.Empty start(com.google.protobuf.EmptyProtos.Empty request) { return blockingUnaryCall( - getChannel().newCall(METHOD_START, getCallOptions()), request); + getChannel(), METHOD_START, getCallOptions(), request); } @java.lang.Override public io.grpc.testing.integration.Messages.ReconnectInfo stop(com.google.protobuf.EmptyProtos.Empty request) { return blockingUnaryCall( - getChannel().newCall(METHOD_STOP, getCallOptions()), request); + getChannel(), METHOD_STOP, getCallOptions(), request); } } diff --git a/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/TestServiceGrpc.java b/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/TestServiceGrpc.java index c06c7b5395..a0939a654b 100644 --- a/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/TestServiceGrpc.java +++ b/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/TestServiceGrpc.java @@ -210,20 +210,20 @@ public class TestServiceGrpc { @java.lang.Override public com.google.protobuf.EmptyProtos.Empty emptyCall(com.google.protobuf.EmptyProtos.Empty request) { return blockingUnaryCall( - getChannel().newCall(METHOD_EMPTY_CALL, getCallOptions()), request); + getChannel(), METHOD_EMPTY_CALL, getCallOptions(), request); } @java.lang.Override public io.grpc.testing.integration.Messages.SimpleResponse unaryCall(io.grpc.testing.integration.Messages.SimpleRequest request) { return blockingUnaryCall( - getChannel().newCall(METHOD_UNARY_CALL, getCallOptions()), request); + getChannel(), METHOD_UNARY_CALL, getCallOptions(), request); } @java.lang.Override public java.util.Iterator streamingOutputCall( io.grpc.testing.integration.Messages.StreamingOutputCallRequest request) { return blockingServerStreamingCall( - getChannel().newCall(METHOD_STREAMING_OUTPUT_CALL, getCallOptions()), request); + getChannel(), METHOD_STREAMING_OUTPUT_CALL, getCallOptions(), request); } } diff --git a/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/UnimplementedServiceGrpc.java b/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/UnimplementedServiceGrpc.java index 2436c26385..3cf6587d93 100644 --- a/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/UnimplementedServiceGrpc.java +++ b/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/UnimplementedServiceGrpc.java @@ -107,7 +107,7 @@ public class UnimplementedServiceGrpc { @java.lang.Override public com.google.protobuf.EmptyProtos.Empty unimplementedCall(com.google.protobuf.EmptyProtos.Empty request) { return blockingUnaryCall( - getChannel().newCall(METHOD_UNIMPLEMENTED_CALL, getCallOptions()), request); + getChannel(), METHOD_UNIMPLEMENTED_CALL, getCallOptions(), request); } } diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java index a3f3c81823..73fa3f2636 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCalls.java +++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java @@ -36,8 +36,11 @@ import com.google.common.base.Throwables; import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.ListenableFuture; +import io.grpc.CallOptions; +import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.Metadata; +import io.grpc.MethodDescriptor; import io.grpc.Status; import java.util.Iterator; @@ -45,7 +48,11 @@ import java.util.NoSuchElementException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.annotation.Nullable; @@ -55,6 +62,8 @@ import javax.annotation.Nullable; * that the runtime can vary behavior without requiring regeneration of the stub. */ public class ClientCalls { + private static final Logger log = Logger.getLogger(ClientCalls.class.getName()); + // Prevent instantiation private ClientCalls() {} @@ -110,6 +119,32 @@ public class ClientCalls { } } + /** + * Executes a unary call and blocks on the response. + * + * @return the single response message. + */ + public static RespT blockingUnaryCall( + Channel channel, MethodDescriptor method, CallOptions callOptions, ReqT param) { + ThreadlessExecutor executor = new ThreadlessExecutor(); + ClientCall call = channel.newCall(method, callOptions.withExecutor(executor)); + try { + ListenableFuture responseFuture = futureUnaryCall(call, param); + while (!responseFuture.isDone()) { + try { + executor.waitAndDrain(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + return getUnchecked(responseFuture); + } catch (Throwable t) { + call.cancel(); + throw Throwables.propagate(t); + } + } + /** * Executes a server-streaming call returning a blocking {@link Iterator} over the * response stream. @@ -123,6 +158,22 @@ public class ClientCalls { return result; } + /** + * Executes a server-streaming call returning a blocking {@link Iterator} over the + * response stream. + * + * @return an iterator over the response stream. + */ + // TODO(louiscryan): Not clear if we want to use this idiom for 'simple' stubs. + public static Iterator blockingServerStreamingCall( + Channel channel, MethodDescriptor method, CallOptions callOptions, ReqT param) { + ThreadlessExecutor executor = new ThreadlessExecutor(); + ClientCall call = channel.newCall(method, callOptions.withExecutor(executor)); + BlockingResponseStream result = new BlockingResponseStream(call, executor); + asyncUnaryRequestCall(call, param, result.listener(), true); + return result; + } + /** * Executes a unary call and returns a {@link ListenableFuture} to the response. * @@ -345,26 +396,48 @@ public class ClientCalls { private final BlockingQueue buffer = new ArrayBlockingQueue(2); private final ClientCall.Listener listener = new QueuingListener(); private final ClientCall call; + /** May be null. */ + private final ThreadlessExecutor threadless; // Only accessed when iterating. private Object last; private BlockingResponseStream(ClientCall call) { + this(call, null); + } + + private BlockingResponseStream(ClientCall call, ThreadlessExecutor threadless) { this.call = call; + this.threadless = threadless; } ClientCall.Listener listener() { return listener; } + private Object waitForNext() throws InterruptedException { + if (threadless == null) { + return buffer.take(); + } else { + Object next = buffer.poll(); + while (next == null) { + threadless.waitAndDrain(); + next = buffer.poll(); + } + return next; + } + } + @Override public boolean hasNext() { - try { - // Will block here indefinitely waiting for content. RPC timeouts defend against permanent - // hangs here as the call will become closed. - last = (last == null) ? buffer.take() : last; - } catch (InterruptedException ie) { - Thread.interrupted(); - throw new RuntimeException(ie); + if (last == null) { + try { + // Will block here indefinitely waiting for content. RPC timeouts defend against permanent + // hangs here as the call will become closed. + last = waitForNext(); + } catch (InterruptedException ie) { + Thread.interrupted(); + throw new RuntimeException(ie); + } } if (last instanceof Status) { throw ((Status) last).asRuntimeException(); @@ -417,4 +490,28 @@ public class ClientCalls { } } } + + private static class ThreadlessExecutor implements Executor { + private final BlockingQueue queue = new LinkedBlockingQueue(); + + /** + * Waits until there is a Runnable, then executes it and all queued Runnables after it. + */ + public void waitAndDrain() throws InterruptedException { + Runnable runnable = queue.take(); + while (runnable != null) { + try { + runnable.run(); + } catch (Throwable t) { + log.log(Level.WARNING, "Runnable threw exception", t); + } + runnable = queue.poll(); + } + } + + @Override + public void execute(Runnable runnable) { + queue.add(runnable); + } + } }