From 867c76d185d210e5a745e820cc78ff320d726215 Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Mon, 13 Jul 2015 17:15:34 -0700 Subject: [PATCH] Separate ServerCall binding utilities per method type. This gives us more flexibility in API changes in the future. Unary call and server streaming call should call the flow-control method call.request() only once. Previously it was called whenever a request arrives, which is wrong. Now it's fixed. Resolves #436 --- .../grpc/io/grpc/testing/TestServiceGrpc.java | 14 +-- .../main/grpc/io/grpc/testing/WorkerGrpc.java | 14 +-- .../src/java_plugin/cpp/java_generator.cpp | 33 +++++-- compiler/src/test/golden/TestService.java.txt | 26 +++--- .../src/test/golden/TestServiceNano.java.txt | 26 +++--- .../grpc/examples/helloworld/GreeterGrpc.java | 10 ++- .../examples/routeguide/RouteGuideGrpc.java | 22 ++--- .../testing/integration/TestServiceGrpc.java | 30 ++++--- .../main/java/io/grpc/stub/ServerCalls.java | 88 ++++++++++++++++--- 9 files changed, 177 insertions(+), 86 deletions(-) diff --git a/benchmarks/src/generated/main/grpc/io/grpc/testing/TestServiceGrpc.java b/benchmarks/src/generated/main/grpc/io/grpc/testing/TestServiceGrpc.java index adc89b86c4..d2ac474ba3 100644 --- a/benchmarks/src/generated/main/grpc/io/grpc/testing/TestServiceGrpc.java +++ b/benchmarks/src/generated/main/grpc/io/grpc/testing/TestServiceGrpc.java @@ -7,8 +7,10 @@ import static io.grpc.stub.ClientCalls.duplexStreamingCall; import static io.grpc.stub.ClientCalls.blockingUnaryCall; import static io.grpc.stub.ClientCalls.blockingServerStreamingCall; import static io.grpc.stub.ClientCalls.unaryFutureCall; -import static io.grpc.stub.ServerCalls.asyncUnaryRequestCall; -import static io.grpc.stub.ServerCalls.asyncStreamingRequestCall; +import static io.grpc.stub.ServerCalls.asyncUnaryCall; +import static io.grpc.stub.ServerCalls.asyncServerStreamingCall; +import static io.grpc.stub.ServerCalls.asyncClientStreamingCall; +import static io.grpc.stub.ServerCalls.asyncDuplexStreamingCall; @javax.annotation.Generated("by gRPC proto compiler") public class TestServiceGrpc { @@ -204,8 +206,8 @@ public class TestServiceGrpc { return io.grpc.ServerServiceDefinition.builder("grpc.testing.TestService") .addMethod(io.grpc.ServerMethodDefinition.create( METHOD_UNARY_CALL, - asyncUnaryRequestCall( - new io.grpc.stub.ServerCalls.UnaryRequestMethod< + asyncUnaryCall( + new io.grpc.stub.ServerCalls.UnaryMethod< io.grpc.testing.SimpleRequest, io.grpc.testing.SimpleResponse>() { @java.lang.Override @@ -217,8 +219,8 @@ public class TestServiceGrpc { }))) .addMethod(io.grpc.ServerMethodDefinition.create( METHOD_STREAMING_CALL, - asyncStreamingRequestCall( - new io.grpc.stub.ServerCalls.StreamingRequestMethod< + asyncDuplexStreamingCall( + new io.grpc.stub.ServerCalls.DuplexStreamingMethod< io.grpc.testing.SimpleRequest, io.grpc.testing.SimpleResponse>() { @java.lang.Override diff --git a/benchmarks/src/generated/main/grpc/io/grpc/testing/WorkerGrpc.java b/benchmarks/src/generated/main/grpc/io/grpc/testing/WorkerGrpc.java index daca204d40..37dbacf48a 100644 --- a/benchmarks/src/generated/main/grpc/io/grpc/testing/WorkerGrpc.java +++ b/benchmarks/src/generated/main/grpc/io/grpc/testing/WorkerGrpc.java @@ -7,8 +7,10 @@ import static io.grpc.stub.ClientCalls.duplexStreamingCall; import static io.grpc.stub.ClientCalls.blockingUnaryCall; import static io.grpc.stub.ClientCalls.blockingServerStreamingCall; import static io.grpc.stub.ClientCalls.unaryFutureCall; -import static io.grpc.stub.ServerCalls.asyncUnaryRequestCall; -import static io.grpc.stub.ServerCalls.asyncStreamingRequestCall; +import static io.grpc.stub.ServerCalls.asyncUnaryCall; +import static io.grpc.stub.ServerCalls.asyncServerStreamingCall; +import static io.grpc.stub.ServerCalls.asyncClientStreamingCall; +import static io.grpc.stub.ServerCalls.asyncDuplexStreamingCall; @javax.annotation.Generated("by gRPC proto compiler") public class WorkerGrpc { @@ -186,8 +188,8 @@ public class WorkerGrpc { return io.grpc.ServerServiceDefinition.builder("grpc.testing.Worker") .addMethod(io.grpc.ServerMethodDefinition.create( METHOD_RUN_TEST, - asyncStreamingRequestCall( - new io.grpc.stub.ServerCalls.StreamingRequestMethod< + asyncDuplexStreamingCall( + new io.grpc.stub.ServerCalls.DuplexStreamingMethod< io.grpc.testing.ClientArgs, io.grpc.testing.ClientStatus>() { @java.lang.Override @@ -198,8 +200,8 @@ public class WorkerGrpc { }))) .addMethod(io.grpc.ServerMethodDefinition.create( METHOD_RUN_SERVER, - asyncStreamingRequestCall( - new io.grpc.stub.ServerCalls.StreamingRequestMethod< + asyncDuplexStreamingCall( + new io.grpc.stub.ServerCalls.DuplexStreamingMethod< io.grpc.testing.ServerArgs, io.grpc.testing.ServerStatus>() { @java.lang.Override diff --git a/compiler/src/java_plugin/cpp/java_generator.cpp b/compiler/src/java_plugin/cpp/java_generator.cpp index 5795157c1d..bac19ca590 100644 --- a/compiler/src/java_plugin/cpp/java_generator.cpp +++ b/compiler/src/java_plugin/cpp/java_generator.cpp @@ -512,14 +512,27 @@ static void PrintBindServiceMethod(const ServiceDescriptor* service, (*vars)["input_type"] = MessageFullJavaName(method->input_type()); (*vars)["output_type"] = MessageFullJavaName(method->output_type()); bool client_streaming = method->client_streaming(); + bool server_streaming = method->server_streaming(); if (client_streaming) { - (*vars)["calls_method"] = "asyncStreamingRequestCall"; - (*vars)["invocation_class"] = - "io.grpc.stub.ServerCalls.StreamingRequestMethod"; + if (server_streaming) { + (*vars)["calls_method"] = "asyncDuplexStreamingCall"; + (*vars)["invocation_class"] = + "io.grpc.stub.ServerCalls.DuplexStreamingMethod"; + } else { + (*vars)["calls_method"] = "asyncClientStreamingCall"; + (*vars)["invocation_class"] = + "io.grpc.stub.ServerCalls.ClientStreamingMethod"; + } } else { - (*vars)["calls_method"] = "asyncUnaryRequestCall"; - (*vars)["invocation_class"] = - "io.grpc.stub.ServerCalls.UnaryRequestMethod"; + if (server_streaming) { + (*vars)["calls_method"] = "asyncServerStreamingCall"; + (*vars)["invocation_class"] = + "io.grpc.stub.ServerCalls.ServerStreamingMethod"; + } else { + (*vars)["calls_method"] = "asyncUnaryCall"; + (*vars)["invocation_class"] = + "io.grpc.stub.ServerCalls.UnaryMethod"; + } } p->Print(*vars, ".addMethod($ServerMethodDefinition$.create(\n"); p->Indent(); @@ -647,9 +660,13 @@ void PrintImports(Printer* p, bool generate_nano) { "import static " "io.grpc.stub.ClientCalls.unaryFutureCall;\n" "import static " - "io.grpc.stub.ServerCalls.asyncUnaryRequestCall;\n" + "io.grpc.stub.ServerCalls.asyncUnaryCall;\n" "import static " - "io.grpc.stub.ServerCalls.asyncStreamingRequestCall;\n\n"); + "io.grpc.stub.ServerCalls.asyncServerStreamingCall;\n" + "import static " + "io.grpc.stub.ServerCalls.asyncClientStreamingCall;\n" + "import static " + "io.grpc.stub.ServerCalls.asyncDuplexStreamingCall;\n\n"); if (generate_nano) { p->Print("import java.io.IOException;\n\n"); } diff --git a/compiler/src/test/golden/TestService.java.txt b/compiler/src/test/golden/TestService.java.txt index a8b4f47bdb..13d92f3733 100644 --- a/compiler/src/test/golden/TestService.java.txt +++ b/compiler/src/test/golden/TestService.java.txt @@ -7,8 +7,10 @@ import static io.grpc.stub.ClientCalls.duplexStreamingCall; import static io.grpc.stub.ClientCalls.blockingUnaryCall; import static io.grpc.stub.ClientCalls.blockingServerStreamingCall; import static io.grpc.stub.ClientCalls.unaryFutureCall; -import static io.grpc.stub.ServerCalls.asyncUnaryRequestCall; -import static io.grpc.stub.ServerCalls.asyncStreamingRequestCall; +import static io.grpc.stub.ServerCalls.asyncUnaryCall; +import static io.grpc.stub.ServerCalls.asyncServerStreamingCall; +import static io.grpc.stub.ServerCalls.asyncClientStreamingCall; +import static io.grpc.stub.ServerCalls.asyncDuplexStreamingCall; @javax.annotation.Generated("by gRPC proto compiler") public class TestServiceGrpc { @@ -289,8 +291,8 @@ public class TestServiceGrpc { return io.grpc.ServerServiceDefinition.builder("grpc.testing.TestService") .addMethod(io.grpc.ServerMethodDefinition.create( METHOD_UNARY_CALL, - asyncUnaryRequestCall( - new io.grpc.stub.ServerCalls.UnaryRequestMethod< + asyncUnaryCall( + new io.grpc.stub.ServerCalls.UnaryMethod< io.grpc.testing.integration.Test.SimpleRequest, io.grpc.testing.integration.Test.SimpleResponse>() { @java.lang.Override @@ -302,8 +304,8 @@ public class TestServiceGrpc { }))) .addMethod(io.grpc.ServerMethodDefinition.create( METHOD_STREAMING_OUTPUT_CALL, - asyncUnaryRequestCall( - new io.grpc.stub.ServerCalls.UnaryRequestMethod< + asyncServerStreamingCall( + new io.grpc.stub.ServerCalls.ServerStreamingMethod< io.grpc.testing.integration.Test.StreamingOutputCallRequest, io.grpc.testing.integration.Test.StreamingOutputCallResponse>() { @java.lang.Override @@ -315,8 +317,8 @@ public class TestServiceGrpc { }))) .addMethod(io.grpc.ServerMethodDefinition.create( METHOD_STREAMING_INPUT_CALL, - asyncStreamingRequestCall( - new io.grpc.stub.ServerCalls.StreamingRequestMethod< + asyncClientStreamingCall( + new io.grpc.stub.ServerCalls.ClientStreamingMethod< io.grpc.testing.integration.Test.StreamingInputCallRequest, io.grpc.testing.integration.Test.StreamingInputCallResponse>() { @java.lang.Override @@ -327,8 +329,8 @@ public class TestServiceGrpc { }))) .addMethod(io.grpc.ServerMethodDefinition.create( METHOD_FULL_DUPLEX_CALL, - asyncStreamingRequestCall( - new io.grpc.stub.ServerCalls.StreamingRequestMethod< + asyncDuplexStreamingCall( + new io.grpc.stub.ServerCalls.DuplexStreamingMethod< io.grpc.testing.integration.Test.StreamingOutputCallRequest, io.grpc.testing.integration.Test.StreamingOutputCallResponse>() { @java.lang.Override @@ -339,8 +341,8 @@ public class TestServiceGrpc { }))) .addMethod(io.grpc.ServerMethodDefinition.create( METHOD_HALF_DUPLEX_CALL, - asyncStreamingRequestCall( - new io.grpc.stub.ServerCalls.StreamingRequestMethod< + asyncDuplexStreamingCall( + new io.grpc.stub.ServerCalls.DuplexStreamingMethod< io.grpc.testing.integration.Test.StreamingOutputCallRequest, io.grpc.testing.integration.Test.StreamingOutputCallResponse>() { @java.lang.Override diff --git a/compiler/src/test/golden/TestServiceNano.java.txt b/compiler/src/test/golden/TestServiceNano.java.txt index 114f6714bd..07e98a8069 100644 --- a/compiler/src/test/golden/TestServiceNano.java.txt +++ b/compiler/src/test/golden/TestServiceNano.java.txt @@ -7,8 +7,10 @@ import static io.grpc.stub.ClientCalls.duplexStreamingCall; import static io.grpc.stub.ClientCalls.blockingUnaryCall; import static io.grpc.stub.ClientCalls.blockingServerStreamingCall; import static io.grpc.stub.ClientCalls.unaryFutureCall; -import static io.grpc.stub.ServerCalls.asyncUnaryRequestCall; -import static io.grpc.stub.ServerCalls.asyncStreamingRequestCall; +import static io.grpc.stub.ServerCalls.asyncUnaryCall; +import static io.grpc.stub.ServerCalls.asyncServerStreamingCall; +import static io.grpc.stub.ServerCalls.asyncClientStreamingCall; +import static io.grpc.stub.ServerCalls.asyncDuplexStreamingCall; import java.io.IOException; @@ -351,8 +353,8 @@ public class TestServiceGrpc { return io.grpc.ServerServiceDefinition.builder("grpc.testing.TestService") .addMethod(io.grpc.ServerMethodDefinition.create( METHOD_UNARY_CALL, - asyncUnaryRequestCall( - new io.grpc.stub.ServerCalls.UnaryRequestMethod< + asyncUnaryCall( + new io.grpc.stub.ServerCalls.UnaryMethod< io.grpc.testing.integration.Test.SimpleRequest, io.grpc.testing.integration.Test.SimpleResponse>() { @java.lang.Override @@ -364,8 +366,8 @@ public class TestServiceGrpc { }))) .addMethod(io.grpc.ServerMethodDefinition.create( METHOD_STREAMING_OUTPUT_CALL, - asyncUnaryRequestCall( - new io.grpc.stub.ServerCalls.UnaryRequestMethod< + asyncServerStreamingCall( + new io.grpc.stub.ServerCalls.ServerStreamingMethod< io.grpc.testing.integration.Test.StreamingOutputCallRequest, io.grpc.testing.integration.Test.StreamingOutputCallResponse>() { @java.lang.Override @@ -377,8 +379,8 @@ public class TestServiceGrpc { }))) .addMethod(io.grpc.ServerMethodDefinition.create( METHOD_STREAMING_INPUT_CALL, - asyncStreamingRequestCall( - new io.grpc.stub.ServerCalls.StreamingRequestMethod< + asyncClientStreamingCall( + new io.grpc.stub.ServerCalls.ClientStreamingMethod< io.grpc.testing.integration.Test.StreamingInputCallRequest, io.grpc.testing.integration.Test.StreamingInputCallResponse>() { @java.lang.Override @@ -389,8 +391,8 @@ public class TestServiceGrpc { }))) .addMethod(io.grpc.ServerMethodDefinition.create( METHOD_FULL_DUPLEX_CALL, - asyncStreamingRequestCall( - new io.grpc.stub.ServerCalls.StreamingRequestMethod< + asyncDuplexStreamingCall( + new io.grpc.stub.ServerCalls.DuplexStreamingMethod< io.grpc.testing.integration.Test.StreamingOutputCallRequest, io.grpc.testing.integration.Test.StreamingOutputCallResponse>() { @java.lang.Override @@ -401,8 +403,8 @@ public class TestServiceGrpc { }))) .addMethod(io.grpc.ServerMethodDefinition.create( METHOD_HALF_DUPLEX_CALL, - asyncStreamingRequestCall( - new io.grpc.stub.ServerCalls.StreamingRequestMethod< + asyncDuplexStreamingCall( + new io.grpc.stub.ServerCalls.DuplexStreamingMethod< io.grpc.testing.integration.Test.StreamingOutputCallRequest, io.grpc.testing.integration.Test.StreamingOutputCallResponse>() { @java.lang.Override diff --git a/examples/src/generated/main/grpc/io/grpc/examples/helloworld/GreeterGrpc.java b/examples/src/generated/main/grpc/io/grpc/examples/helloworld/GreeterGrpc.java index d625d4e8fe..5e25372750 100644 --- a/examples/src/generated/main/grpc/io/grpc/examples/helloworld/GreeterGrpc.java +++ b/examples/src/generated/main/grpc/io/grpc/examples/helloworld/GreeterGrpc.java @@ -7,8 +7,10 @@ import static io.grpc.stub.ClientCalls.duplexStreamingCall; import static io.grpc.stub.ClientCalls.blockingUnaryCall; import static io.grpc.stub.ClientCalls.blockingServerStreamingCall; import static io.grpc.stub.ClientCalls.unaryFutureCall; -import static io.grpc.stub.ServerCalls.asyncUnaryRequestCall; -import static io.grpc.stub.ServerCalls.asyncStreamingRequestCall; +import static io.grpc.stub.ServerCalls.asyncUnaryCall; +import static io.grpc.stub.ServerCalls.asyncServerStreamingCall; +import static io.grpc.stub.ServerCalls.asyncClientStreamingCall; +import static io.grpc.stub.ServerCalls.asyncDuplexStreamingCall; @javax.annotation.Generated("by gRPC proto compiler") public class GreeterGrpc { @@ -179,8 +181,8 @@ public class GreeterGrpc { return io.grpc.ServerServiceDefinition.builder("helloworld.Greeter") .addMethod(io.grpc.ServerMethodDefinition.create( METHOD_SAY_HELLO, - asyncUnaryRequestCall( - new io.grpc.stub.ServerCalls.UnaryRequestMethod< + asyncUnaryCall( + new io.grpc.stub.ServerCalls.UnaryMethod< io.grpc.examples.helloworld.HelloRequest, io.grpc.examples.helloworld.HelloResponse>() { @java.lang.Override diff --git a/examples/src/generated/main/grpc/io/grpc/examples/routeguide/RouteGuideGrpc.java b/examples/src/generated/main/grpc/io/grpc/examples/routeguide/RouteGuideGrpc.java index 5ef8d6e10c..9a1b805a27 100644 --- a/examples/src/generated/main/grpc/io/grpc/examples/routeguide/RouteGuideGrpc.java +++ b/examples/src/generated/main/grpc/io/grpc/examples/routeguide/RouteGuideGrpc.java @@ -7,8 +7,10 @@ import static io.grpc.stub.ClientCalls.duplexStreamingCall; import static io.grpc.stub.ClientCalls.blockingUnaryCall; import static io.grpc.stub.ClientCalls.blockingServerStreamingCall; import static io.grpc.stub.ClientCalls.unaryFutureCall; -import static io.grpc.stub.ServerCalls.asyncUnaryRequestCall; -import static io.grpc.stub.ServerCalls.asyncStreamingRequestCall; +import static io.grpc.stub.ServerCalls.asyncUnaryCall; +import static io.grpc.stub.ServerCalls.asyncServerStreamingCall; +import static io.grpc.stub.ServerCalls.asyncClientStreamingCall; +import static io.grpc.stub.ServerCalls.asyncDuplexStreamingCall; @javax.annotation.Generated("by gRPC proto compiler") public class RouteGuideGrpc { @@ -264,8 +266,8 @@ public class RouteGuideGrpc { return io.grpc.ServerServiceDefinition.builder("routeguide.RouteGuide") .addMethod(io.grpc.ServerMethodDefinition.create( METHOD_GET_FEATURE, - asyncUnaryRequestCall( - new io.grpc.stub.ServerCalls.UnaryRequestMethod< + asyncUnaryCall( + new io.grpc.stub.ServerCalls.UnaryMethod< io.grpc.examples.routeguide.Point, io.grpc.examples.routeguide.Feature>() { @java.lang.Override @@ -277,8 +279,8 @@ public class RouteGuideGrpc { }))) .addMethod(io.grpc.ServerMethodDefinition.create( METHOD_LIST_FEATURES, - asyncUnaryRequestCall( - new io.grpc.stub.ServerCalls.UnaryRequestMethod< + asyncServerStreamingCall( + new io.grpc.stub.ServerCalls.ServerStreamingMethod< io.grpc.examples.routeguide.Rectangle, io.grpc.examples.routeguide.Feature>() { @java.lang.Override @@ -290,8 +292,8 @@ public class RouteGuideGrpc { }))) .addMethod(io.grpc.ServerMethodDefinition.create( METHOD_RECORD_ROUTE, - asyncStreamingRequestCall( - new io.grpc.stub.ServerCalls.StreamingRequestMethod< + asyncClientStreamingCall( + new io.grpc.stub.ServerCalls.ClientStreamingMethod< io.grpc.examples.routeguide.Point, io.grpc.examples.routeguide.RouteSummary>() { @java.lang.Override @@ -302,8 +304,8 @@ public class RouteGuideGrpc { }))) .addMethod(io.grpc.ServerMethodDefinition.create( METHOD_ROUTE_CHAT, - asyncStreamingRequestCall( - new io.grpc.stub.ServerCalls.StreamingRequestMethod< + asyncDuplexStreamingCall( + new io.grpc.stub.ServerCalls.DuplexStreamingMethod< io.grpc.examples.routeguide.RouteNote, io.grpc.examples.routeguide.RouteNote>() { @java.lang.Override diff --git a/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/TestServiceGrpc.java b/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/TestServiceGrpc.java index f579b9a87b..360e18b431 100644 --- a/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/TestServiceGrpc.java +++ b/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/TestServiceGrpc.java @@ -7,8 +7,10 @@ import static io.grpc.stub.ClientCalls.duplexStreamingCall; import static io.grpc.stub.ClientCalls.blockingUnaryCall; import static io.grpc.stub.ClientCalls.blockingServerStreamingCall; import static io.grpc.stub.ClientCalls.unaryFutureCall; -import static io.grpc.stub.ServerCalls.asyncUnaryRequestCall; -import static io.grpc.stub.ServerCalls.asyncStreamingRequestCall; +import static io.grpc.stub.ServerCalls.asyncUnaryCall; +import static io.grpc.stub.ServerCalls.asyncServerStreamingCall; +import static io.grpc.stub.ServerCalls.asyncClientStreamingCall; +import static io.grpc.stub.ServerCalls.asyncDuplexStreamingCall; @javax.annotation.Generated("by gRPC proto compiler") public class TestServiceGrpc { @@ -332,8 +334,8 @@ public class TestServiceGrpc { return io.grpc.ServerServiceDefinition.builder("grpc.testing.TestService") .addMethod(io.grpc.ServerMethodDefinition.create( METHOD_EMPTY_CALL, - asyncUnaryRequestCall( - new io.grpc.stub.ServerCalls.UnaryRequestMethod< + asyncUnaryCall( + new io.grpc.stub.ServerCalls.UnaryMethod< com.google.protobuf.EmptyProtos.Empty, com.google.protobuf.EmptyProtos.Empty>() { @java.lang.Override @@ -345,8 +347,8 @@ public class TestServiceGrpc { }))) .addMethod(io.grpc.ServerMethodDefinition.create( METHOD_UNARY_CALL, - asyncUnaryRequestCall( - new io.grpc.stub.ServerCalls.UnaryRequestMethod< + asyncUnaryCall( + new io.grpc.stub.ServerCalls.UnaryMethod< io.grpc.testing.integration.Messages.SimpleRequest, io.grpc.testing.integration.Messages.SimpleResponse>() { @java.lang.Override @@ -358,8 +360,8 @@ public class TestServiceGrpc { }))) .addMethod(io.grpc.ServerMethodDefinition.create( METHOD_STREAMING_OUTPUT_CALL, - asyncUnaryRequestCall( - new io.grpc.stub.ServerCalls.UnaryRequestMethod< + asyncServerStreamingCall( + new io.grpc.stub.ServerCalls.ServerStreamingMethod< io.grpc.testing.integration.Messages.StreamingOutputCallRequest, io.grpc.testing.integration.Messages.StreamingOutputCallResponse>() { @java.lang.Override @@ -371,8 +373,8 @@ public class TestServiceGrpc { }))) .addMethod(io.grpc.ServerMethodDefinition.create( METHOD_STREAMING_INPUT_CALL, - asyncStreamingRequestCall( - new io.grpc.stub.ServerCalls.StreamingRequestMethod< + asyncClientStreamingCall( + new io.grpc.stub.ServerCalls.ClientStreamingMethod< io.grpc.testing.integration.Messages.StreamingInputCallRequest, io.grpc.testing.integration.Messages.StreamingInputCallResponse>() { @java.lang.Override @@ -383,8 +385,8 @@ public class TestServiceGrpc { }))) .addMethod(io.grpc.ServerMethodDefinition.create( METHOD_FULL_DUPLEX_CALL, - asyncStreamingRequestCall( - new io.grpc.stub.ServerCalls.StreamingRequestMethod< + asyncDuplexStreamingCall( + new io.grpc.stub.ServerCalls.DuplexStreamingMethod< io.grpc.testing.integration.Messages.StreamingOutputCallRequest, io.grpc.testing.integration.Messages.StreamingOutputCallResponse>() { @java.lang.Override @@ -395,8 +397,8 @@ public class TestServiceGrpc { }))) .addMethod(io.grpc.ServerMethodDefinition.create( METHOD_HALF_DUPLEX_CALL, - asyncStreamingRequestCall( - new io.grpc.stub.ServerCalls.StreamingRequestMethod< + asyncDuplexStreamingCall( + new io.grpc.stub.ServerCalls.DuplexStreamingMethod< io.grpc.testing.integration.Messages.StreamingOutputCallRequest, io.grpc.testing.integration.Messages.StreamingOutputCallResponse>() { @java.lang.Override diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java index 0c6593beff..573d268764 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCalls.java +++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java @@ -45,19 +45,88 @@ public class ServerCalls { private ServerCalls() { } + /** + * Creates a {@code ServerCallHandler} for a unary call method of the service. + * + * @param method an adaptor to the actual method on the service implementation. + */ + public static ServerCallHandler asyncUnaryCall( + final UnaryMethod method) { + return asyncUnaryRequestCall(method); + } + + /** + * Creates a {@code ServerCallHandler} for a server streaming method of the service. + * + * @param method an adaptor to the actual method on the service implementation. + */ + public static ServerCallHandler asyncServerStreamingCall( + final ServerStreamingMethod method) { + return asyncUnaryRequestCall(method); + } + + /** + * Creates a {@code ServerCallHandler} for a client streaming method of the service. + * + * @param method an adaptor to the actual method on the service implementation. + */ + public static ServerCallHandler asyncClientStreamingCall( + final ClientStreamingMethod method) { + return asyncStreamingRequestCall(method); + } + + /** + * Creates a {@code ServerCallHandler} for a duplex streaming method of the service. + * + * @param method an adaptor to the actual method on the service implementation. + */ + public static ServerCallHandler asyncDuplexStreamingCall( + final DuplexStreamingMethod method) { + return asyncStreamingRequestCall(method); + } + + /** + * Adaptor to a unary call method. + */ + public static interface UnaryMethod extends UnaryRequestMethod { + } + + /** + * Adaptor to a server streaming method. + */ + public static interface ServerStreamingMethod + extends UnaryRequestMethod { + } + + /** + * Adaptor to a client streaming method. + */ + public static interface ClientStreamingMethod + extends StreamingRequestMethod { + } + + /** + * Adaptor to a bi-directional streaming method. + */ + public static interface DuplexStreamingMethod + extends StreamingRequestMethod { + } + /** * Creates a {@code ServerCallHandler} for a unary request call method of the service. * * @param method an adaptor to the actual method on the service implementation. */ - public static ServerCallHandler asyncUnaryRequestCall( + private static ServerCallHandler asyncUnaryRequestCall( final UnaryRequestMethod method) { return new ServerCallHandler() { @Override public ServerCall.Listener startCall( String fullMethodName, final ServerCall call, Metadata.Headers headers) { final ResponseObserver responseObserver = new ResponseObserver(call); - call.request(1); + // We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client + // sends more than 1 requests, we will catch it in onPayload() and emit INVALID_ARGUMENT. + call.request(2); return new EmptyServerCallListener() { ReqT request; @Override @@ -66,9 +135,6 @@ public class ServerCalls { // We delay calling method.invoke() until onHalfClose(), because application may call // close(OK) inside invoke(), while close(OK) is not allowed before onHalfClose(). this.request = request; - - // Request delivery of the next inbound message. - call.request(1); } else { call.close( Status.INVALID_ARGUMENT.withDescription( @@ -101,7 +167,7 @@ public class ServerCalls { * * @param method an adaptor to the actual method on the service implementation. */ - public static ServerCallHandler asyncStreamingRequestCall( + private static ServerCallHandler asyncStreamingRequestCall( final StreamingRequestMethod method) { return new ServerCallHandler() { @Override @@ -139,17 +205,11 @@ public class ServerCalls { }; } - /** - * Adaptor to a unary call or server streaming method. - */ - public static interface UnaryRequestMethod { + private static interface UnaryRequestMethod { void invoke(ReqT request, StreamObserver responseObserver); } - /** - * Adaptor to a client stremaing or bi-directional stremaing method. - */ - public static interface StreamingRequestMethod { + private static interface StreamingRequestMethod { StreamObserver invoke(StreamObserver responseObserver); }