Separate ServerCall binding utilities per method type.

This gives us more flexibility in API changes in the future.

Unary call and server streaming call should call the flow-control method
call.request() only once. Previously it was called whenever a request
arrives, which is wrong. Now it's fixed.

Resolves #436
This commit is contained in:
Kun Zhang 2015-07-13 17:15:34 -07:00
parent 9c27540a4f
commit 867c76d185
9 changed files with 177 additions and 86 deletions

View File

@ -7,8 +7,10 @@ import static io.grpc.stub.ClientCalls.duplexStreamingCall;
import static io.grpc.stub.ClientCalls.blockingUnaryCall;
import static io.grpc.stub.ClientCalls.blockingServerStreamingCall;
import static io.grpc.stub.ClientCalls.unaryFutureCall;
import static io.grpc.stub.ServerCalls.asyncUnaryRequestCall;
import static io.grpc.stub.ServerCalls.asyncStreamingRequestCall;
import static io.grpc.stub.ServerCalls.asyncUnaryCall;
import static io.grpc.stub.ServerCalls.asyncServerStreamingCall;
import static io.grpc.stub.ServerCalls.asyncClientStreamingCall;
import static io.grpc.stub.ServerCalls.asyncDuplexStreamingCall;
@javax.annotation.Generated("by gRPC proto compiler")
public class TestServiceGrpc {
@ -204,8 +206,8 @@ public class TestServiceGrpc {
return io.grpc.ServerServiceDefinition.builder("grpc.testing.TestService")
.addMethod(io.grpc.ServerMethodDefinition.create(
METHOD_UNARY_CALL,
asyncUnaryRequestCall(
new io.grpc.stub.ServerCalls.UnaryRequestMethod<
asyncUnaryCall(
new io.grpc.stub.ServerCalls.UnaryMethod<
io.grpc.testing.SimpleRequest,
io.grpc.testing.SimpleResponse>() {
@java.lang.Override
@ -217,8 +219,8 @@ public class TestServiceGrpc {
})))
.addMethod(io.grpc.ServerMethodDefinition.create(
METHOD_STREAMING_CALL,
asyncStreamingRequestCall(
new io.grpc.stub.ServerCalls.StreamingRequestMethod<
asyncDuplexStreamingCall(
new io.grpc.stub.ServerCalls.DuplexStreamingMethod<
io.grpc.testing.SimpleRequest,
io.grpc.testing.SimpleResponse>() {
@java.lang.Override

View File

@ -7,8 +7,10 @@ import static io.grpc.stub.ClientCalls.duplexStreamingCall;
import static io.grpc.stub.ClientCalls.blockingUnaryCall;
import static io.grpc.stub.ClientCalls.blockingServerStreamingCall;
import static io.grpc.stub.ClientCalls.unaryFutureCall;
import static io.grpc.stub.ServerCalls.asyncUnaryRequestCall;
import static io.grpc.stub.ServerCalls.asyncStreamingRequestCall;
import static io.grpc.stub.ServerCalls.asyncUnaryCall;
import static io.grpc.stub.ServerCalls.asyncServerStreamingCall;
import static io.grpc.stub.ServerCalls.asyncClientStreamingCall;
import static io.grpc.stub.ServerCalls.asyncDuplexStreamingCall;
@javax.annotation.Generated("by gRPC proto compiler")
public class WorkerGrpc {
@ -186,8 +188,8 @@ public class WorkerGrpc {
return io.grpc.ServerServiceDefinition.builder("grpc.testing.Worker")
.addMethod(io.grpc.ServerMethodDefinition.create(
METHOD_RUN_TEST,
asyncStreamingRequestCall(
new io.grpc.stub.ServerCalls.StreamingRequestMethod<
asyncDuplexStreamingCall(
new io.grpc.stub.ServerCalls.DuplexStreamingMethod<
io.grpc.testing.ClientArgs,
io.grpc.testing.ClientStatus>() {
@java.lang.Override
@ -198,8 +200,8 @@ public class WorkerGrpc {
})))
.addMethod(io.grpc.ServerMethodDefinition.create(
METHOD_RUN_SERVER,
asyncStreamingRequestCall(
new io.grpc.stub.ServerCalls.StreamingRequestMethod<
asyncDuplexStreamingCall(
new io.grpc.stub.ServerCalls.DuplexStreamingMethod<
io.grpc.testing.ServerArgs,
io.grpc.testing.ServerStatus>() {
@java.lang.Override

View File

@ -512,14 +512,27 @@ static void PrintBindServiceMethod(const ServiceDescriptor* service,
(*vars)["input_type"] = MessageFullJavaName(method->input_type());
(*vars)["output_type"] = MessageFullJavaName(method->output_type());
bool client_streaming = method->client_streaming();
bool server_streaming = method->server_streaming();
if (client_streaming) {
(*vars)["calls_method"] = "asyncStreamingRequestCall";
(*vars)["invocation_class"] =
"io.grpc.stub.ServerCalls.StreamingRequestMethod";
if (server_streaming) {
(*vars)["calls_method"] = "asyncDuplexStreamingCall";
(*vars)["invocation_class"] =
"io.grpc.stub.ServerCalls.DuplexStreamingMethod";
} else {
(*vars)["calls_method"] = "asyncClientStreamingCall";
(*vars)["invocation_class"] =
"io.grpc.stub.ServerCalls.ClientStreamingMethod";
}
} else {
(*vars)["calls_method"] = "asyncUnaryRequestCall";
(*vars)["invocation_class"] =
"io.grpc.stub.ServerCalls.UnaryRequestMethod";
if (server_streaming) {
(*vars)["calls_method"] = "asyncServerStreamingCall";
(*vars)["invocation_class"] =
"io.grpc.stub.ServerCalls.ServerStreamingMethod";
} else {
(*vars)["calls_method"] = "asyncUnaryCall";
(*vars)["invocation_class"] =
"io.grpc.stub.ServerCalls.UnaryMethod";
}
}
p->Print(*vars, ".addMethod($ServerMethodDefinition$.create(\n");
p->Indent();
@ -647,9 +660,13 @@ void PrintImports(Printer* p, bool generate_nano) {
"import static "
"io.grpc.stub.ClientCalls.unaryFutureCall;\n"
"import static "
"io.grpc.stub.ServerCalls.asyncUnaryRequestCall;\n"
"io.grpc.stub.ServerCalls.asyncUnaryCall;\n"
"import static "
"io.grpc.stub.ServerCalls.asyncStreamingRequestCall;\n\n");
"io.grpc.stub.ServerCalls.asyncServerStreamingCall;\n"
"import static "
"io.grpc.stub.ServerCalls.asyncClientStreamingCall;\n"
"import static "
"io.grpc.stub.ServerCalls.asyncDuplexStreamingCall;\n\n");
if (generate_nano) {
p->Print("import java.io.IOException;\n\n");
}

View File

@ -7,8 +7,10 @@ import static io.grpc.stub.ClientCalls.duplexStreamingCall;
import static io.grpc.stub.ClientCalls.blockingUnaryCall;
import static io.grpc.stub.ClientCalls.blockingServerStreamingCall;
import static io.grpc.stub.ClientCalls.unaryFutureCall;
import static io.grpc.stub.ServerCalls.asyncUnaryRequestCall;
import static io.grpc.stub.ServerCalls.asyncStreamingRequestCall;
import static io.grpc.stub.ServerCalls.asyncUnaryCall;
import static io.grpc.stub.ServerCalls.asyncServerStreamingCall;
import static io.grpc.stub.ServerCalls.asyncClientStreamingCall;
import static io.grpc.stub.ServerCalls.asyncDuplexStreamingCall;
@javax.annotation.Generated("by gRPC proto compiler")
public class TestServiceGrpc {
@ -289,8 +291,8 @@ public class TestServiceGrpc {
return io.grpc.ServerServiceDefinition.builder("grpc.testing.TestService")
.addMethod(io.grpc.ServerMethodDefinition.create(
METHOD_UNARY_CALL,
asyncUnaryRequestCall(
new io.grpc.stub.ServerCalls.UnaryRequestMethod<
asyncUnaryCall(
new io.grpc.stub.ServerCalls.UnaryMethod<
io.grpc.testing.integration.Test.SimpleRequest,
io.grpc.testing.integration.Test.SimpleResponse>() {
@java.lang.Override
@ -302,8 +304,8 @@ public class TestServiceGrpc {
})))
.addMethod(io.grpc.ServerMethodDefinition.create(
METHOD_STREAMING_OUTPUT_CALL,
asyncUnaryRequestCall(
new io.grpc.stub.ServerCalls.UnaryRequestMethod<
asyncServerStreamingCall(
new io.grpc.stub.ServerCalls.ServerStreamingMethod<
io.grpc.testing.integration.Test.StreamingOutputCallRequest,
io.grpc.testing.integration.Test.StreamingOutputCallResponse>() {
@java.lang.Override
@ -315,8 +317,8 @@ public class TestServiceGrpc {
})))
.addMethod(io.grpc.ServerMethodDefinition.create(
METHOD_STREAMING_INPUT_CALL,
asyncStreamingRequestCall(
new io.grpc.stub.ServerCalls.StreamingRequestMethod<
asyncClientStreamingCall(
new io.grpc.stub.ServerCalls.ClientStreamingMethod<
io.grpc.testing.integration.Test.StreamingInputCallRequest,
io.grpc.testing.integration.Test.StreamingInputCallResponse>() {
@java.lang.Override
@ -327,8 +329,8 @@ public class TestServiceGrpc {
})))
.addMethod(io.grpc.ServerMethodDefinition.create(
METHOD_FULL_DUPLEX_CALL,
asyncStreamingRequestCall(
new io.grpc.stub.ServerCalls.StreamingRequestMethod<
asyncDuplexStreamingCall(
new io.grpc.stub.ServerCalls.DuplexStreamingMethod<
io.grpc.testing.integration.Test.StreamingOutputCallRequest,
io.grpc.testing.integration.Test.StreamingOutputCallResponse>() {
@java.lang.Override
@ -339,8 +341,8 @@ public class TestServiceGrpc {
})))
.addMethod(io.grpc.ServerMethodDefinition.create(
METHOD_HALF_DUPLEX_CALL,
asyncStreamingRequestCall(
new io.grpc.stub.ServerCalls.StreamingRequestMethod<
asyncDuplexStreamingCall(
new io.grpc.stub.ServerCalls.DuplexStreamingMethod<
io.grpc.testing.integration.Test.StreamingOutputCallRequest,
io.grpc.testing.integration.Test.StreamingOutputCallResponse>() {
@java.lang.Override

View File

@ -7,8 +7,10 @@ import static io.grpc.stub.ClientCalls.duplexStreamingCall;
import static io.grpc.stub.ClientCalls.blockingUnaryCall;
import static io.grpc.stub.ClientCalls.blockingServerStreamingCall;
import static io.grpc.stub.ClientCalls.unaryFutureCall;
import static io.grpc.stub.ServerCalls.asyncUnaryRequestCall;
import static io.grpc.stub.ServerCalls.asyncStreamingRequestCall;
import static io.grpc.stub.ServerCalls.asyncUnaryCall;
import static io.grpc.stub.ServerCalls.asyncServerStreamingCall;
import static io.grpc.stub.ServerCalls.asyncClientStreamingCall;
import static io.grpc.stub.ServerCalls.asyncDuplexStreamingCall;
import java.io.IOException;
@ -351,8 +353,8 @@ public class TestServiceGrpc {
return io.grpc.ServerServiceDefinition.builder("grpc.testing.TestService")
.addMethod(io.grpc.ServerMethodDefinition.create(
METHOD_UNARY_CALL,
asyncUnaryRequestCall(
new io.grpc.stub.ServerCalls.UnaryRequestMethod<
asyncUnaryCall(
new io.grpc.stub.ServerCalls.UnaryMethod<
io.grpc.testing.integration.Test.SimpleRequest,
io.grpc.testing.integration.Test.SimpleResponse>() {
@java.lang.Override
@ -364,8 +366,8 @@ public class TestServiceGrpc {
})))
.addMethod(io.grpc.ServerMethodDefinition.create(
METHOD_STREAMING_OUTPUT_CALL,
asyncUnaryRequestCall(
new io.grpc.stub.ServerCalls.UnaryRequestMethod<
asyncServerStreamingCall(
new io.grpc.stub.ServerCalls.ServerStreamingMethod<
io.grpc.testing.integration.Test.StreamingOutputCallRequest,
io.grpc.testing.integration.Test.StreamingOutputCallResponse>() {
@java.lang.Override
@ -377,8 +379,8 @@ public class TestServiceGrpc {
})))
.addMethod(io.grpc.ServerMethodDefinition.create(
METHOD_STREAMING_INPUT_CALL,
asyncStreamingRequestCall(
new io.grpc.stub.ServerCalls.StreamingRequestMethod<
asyncClientStreamingCall(
new io.grpc.stub.ServerCalls.ClientStreamingMethod<
io.grpc.testing.integration.Test.StreamingInputCallRequest,
io.grpc.testing.integration.Test.StreamingInputCallResponse>() {
@java.lang.Override
@ -389,8 +391,8 @@ public class TestServiceGrpc {
})))
.addMethod(io.grpc.ServerMethodDefinition.create(
METHOD_FULL_DUPLEX_CALL,
asyncStreamingRequestCall(
new io.grpc.stub.ServerCalls.StreamingRequestMethod<
asyncDuplexStreamingCall(
new io.grpc.stub.ServerCalls.DuplexStreamingMethod<
io.grpc.testing.integration.Test.StreamingOutputCallRequest,
io.grpc.testing.integration.Test.StreamingOutputCallResponse>() {
@java.lang.Override
@ -401,8 +403,8 @@ public class TestServiceGrpc {
})))
.addMethod(io.grpc.ServerMethodDefinition.create(
METHOD_HALF_DUPLEX_CALL,
asyncStreamingRequestCall(
new io.grpc.stub.ServerCalls.StreamingRequestMethod<
asyncDuplexStreamingCall(
new io.grpc.stub.ServerCalls.DuplexStreamingMethod<
io.grpc.testing.integration.Test.StreamingOutputCallRequest,
io.grpc.testing.integration.Test.StreamingOutputCallResponse>() {
@java.lang.Override

View File

@ -7,8 +7,10 @@ import static io.grpc.stub.ClientCalls.duplexStreamingCall;
import static io.grpc.stub.ClientCalls.blockingUnaryCall;
import static io.grpc.stub.ClientCalls.blockingServerStreamingCall;
import static io.grpc.stub.ClientCalls.unaryFutureCall;
import static io.grpc.stub.ServerCalls.asyncUnaryRequestCall;
import static io.grpc.stub.ServerCalls.asyncStreamingRequestCall;
import static io.grpc.stub.ServerCalls.asyncUnaryCall;
import static io.grpc.stub.ServerCalls.asyncServerStreamingCall;
import static io.grpc.stub.ServerCalls.asyncClientStreamingCall;
import static io.grpc.stub.ServerCalls.asyncDuplexStreamingCall;
@javax.annotation.Generated("by gRPC proto compiler")
public class GreeterGrpc {
@ -179,8 +181,8 @@ public class GreeterGrpc {
return io.grpc.ServerServiceDefinition.builder("helloworld.Greeter")
.addMethod(io.grpc.ServerMethodDefinition.create(
METHOD_SAY_HELLO,
asyncUnaryRequestCall(
new io.grpc.stub.ServerCalls.UnaryRequestMethod<
asyncUnaryCall(
new io.grpc.stub.ServerCalls.UnaryMethod<
io.grpc.examples.helloworld.HelloRequest,
io.grpc.examples.helloworld.HelloResponse>() {
@java.lang.Override

View File

@ -7,8 +7,10 @@ import static io.grpc.stub.ClientCalls.duplexStreamingCall;
import static io.grpc.stub.ClientCalls.blockingUnaryCall;
import static io.grpc.stub.ClientCalls.blockingServerStreamingCall;
import static io.grpc.stub.ClientCalls.unaryFutureCall;
import static io.grpc.stub.ServerCalls.asyncUnaryRequestCall;
import static io.grpc.stub.ServerCalls.asyncStreamingRequestCall;
import static io.grpc.stub.ServerCalls.asyncUnaryCall;
import static io.grpc.stub.ServerCalls.asyncServerStreamingCall;
import static io.grpc.stub.ServerCalls.asyncClientStreamingCall;
import static io.grpc.stub.ServerCalls.asyncDuplexStreamingCall;
@javax.annotation.Generated("by gRPC proto compiler")
public class RouteGuideGrpc {
@ -264,8 +266,8 @@ public class RouteGuideGrpc {
return io.grpc.ServerServiceDefinition.builder("routeguide.RouteGuide")
.addMethod(io.grpc.ServerMethodDefinition.create(
METHOD_GET_FEATURE,
asyncUnaryRequestCall(
new io.grpc.stub.ServerCalls.UnaryRequestMethod<
asyncUnaryCall(
new io.grpc.stub.ServerCalls.UnaryMethod<
io.grpc.examples.routeguide.Point,
io.grpc.examples.routeguide.Feature>() {
@java.lang.Override
@ -277,8 +279,8 @@ public class RouteGuideGrpc {
})))
.addMethod(io.grpc.ServerMethodDefinition.create(
METHOD_LIST_FEATURES,
asyncUnaryRequestCall(
new io.grpc.stub.ServerCalls.UnaryRequestMethod<
asyncServerStreamingCall(
new io.grpc.stub.ServerCalls.ServerStreamingMethod<
io.grpc.examples.routeguide.Rectangle,
io.grpc.examples.routeguide.Feature>() {
@java.lang.Override
@ -290,8 +292,8 @@ public class RouteGuideGrpc {
})))
.addMethod(io.grpc.ServerMethodDefinition.create(
METHOD_RECORD_ROUTE,
asyncStreamingRequestCall(
new io.grpc.stub.ServerCalls.StreamingRequestMethod<
asyncClientStreamingCall(
new io.grpc.stub.ServerCalls.ClientStreamingMethod<
io.grpc.examples.routeguide.Point,
io.grpc.examples.routeguide.RouteSummary>() {
@java.lang.Override
@ -302,8 +304,8 @@ public class RouteGuideGrpc {
})))
.addMethod(io.grpc.ServerMethodDefinition.create(
METHOD_ROUTE_CHAT,
asyncStreamingRequestCall(
new io.grpc.stub.ServerCalls.StreamingRequestMethod<
asyncDuplexStreamingCall(
new io.grpc.stub.ServerCalls.DuplexStreamingMethod<
io.grpc.examples.routeguide.RouteNote,
io.grpc.examples.routeguide.RouteNote>() {
@java.lang.Override

View File

@ -7,8 +7,10 @@ import static io.grpc.stub.ClientCalls.duplexStreamingCall;
import static io.grpc.stub.ClientCalls.blockingUnaryCall;
import static io.grpc.stub.ClientCalls.blockingServerStreamingCall;
import static io.grpc.stub.ClientCalls.unaryFutureCall;
import static io.grpc.stub.ServerCalls.asyncUnaryRequestCall;
import static io.grpc.stub.ServerCalls.asyncStreamingRequestCall;
import static io.grpc.stub.ServerCalls.asyncUnaryCall;
import static io.grpc.stub.ServerCalls.asyncServerStreamingCall;
import static io.grpc.stub.ServerCalls.asyncClientStreamingCall;
import static io.grpc.stub.ServerCalls.asyncDuplexStreamingCall;
@javax.annotation.Generated("by gRPC proto compiler")
public class TestServiceGrpc {
@ -332,8 +334,8 @@ public class TestServiceGrpc {
return io.grpc.ServerServiceDefinition.builder("grpc.testing.TestService")
.addMethod(io.grpc.ServerMethodDefinition.create(
METHOD_EMPTY_CALL,
asyncUnaryRequestCall(
new io.grpc.stub.ServerCalls.UnaryRequestMethod<
asyncUnaryCall(
new io.grpc.stub.ServerCalls.UnaryMethod<
com.google.protobuf.EmptyProtos.Empty,
com.google.protobuf.EmptyProtos.Empty>() {
@java.lang.Override
@ -345,8 +347,8 @@ public class TestServiceGrpc {
})))
.addMethod(io.grpc.ServerMethodDefinition.create(
METHOD_UNARY_CALL,
asyncUnaryRequestCall(
new io.grpc.stub.ServerCalls.UnaryRequestMethod<
asyncUnaryCall(
new io.grpc.stub.ServerCalls.UnaryMethod<
io.grpc.testing.integration.Messages.SimpleRequest,
io.grpc.testing.integration.Messages.SimpleResponse>() {
@java.lang.Override
@ -358,8 +360,8 @@ public class TestServiceGrpc {
})))
.addMethod(io.grpc.ServerMethodDefinition.create(
METHOD_STREAMING_OUTPUT_CALL,
asyncUnaryRequestCall(
new io.grpc.stub.ServerCalls.UnaryRequestMethod<
asyncServerStreamingCall(
new io.grpc.stub.ServerCalls.ServerStreamingMethod<
io.grpc.testing.integration.Messages.StreamingOutputCallRequest,
io.grpc.testing.integration.Messages.StreamingOutputCallResponse>() {
@java.lang.Override
@ -371,8 +373,8 @@ public class TestServiceGrpc {
})))
.addMethod(io.grpc.ServerMethodDefinition.create(
METHOD_STREAMING_INPUT_CALL,
asyncStreamingRequestCall(
new io.grpc.stub.ServerCalls.StreamingRequestMethod<
asyncClientStreamingCall(
new io.grpc.stub.ServerCalls.ClientStreamingMethod<
io.grpc.testing.integration.Messages.StreamingInputCallRequest,
io.grpc.testing.integration.Messages.StreamingInputCallResponse>() {
@java.lang.Override
@ -383,8 +385,8 @@ public class TestServiceGrpc {
})))
.addMethod(io.grpc.ServerMethodDefinition.create(
METHOD_FULL_DUPLEX_CALL,
asyncStreamingRequestCall(
new io.grpc.stub.ServerCalls.StreamingRequestMethod<
asyncDuplexStreamingCall(
new io.grpc.stub.ServerCalls.DuplexStreamingMethod<
io.grpc.testing.integration.Messages.StreamingOutputCallRequest,
io.grpc.testing.integration.Messages.StreamingOutputCallResponse>() {
@java.lang.Override
@ -395,8 +397,8 @@ public class TestServiceGrpc {
})))
.addMethod(io.grpc.ServerMethodDefinition.create(
METHOD_HALF_DUPLEX_CALL,
asyncStreamingRequestCall(
new io.grpc.stub.ServerCalls.StreamingRequestMethod<
asyncDuplexStreamingCall(
new io.grpc.stub.ServerCalls.DuplexStreamingMethod<
io.grpc.testing.integration.Messages.StreamingOutputCallRequest,
io.grpc.testing.integration.Messages.StreamingOutputCallResponse>() {
@java.lang.Override

View File

@ -45,19 +45,88 @@ public class ServerCalls {
private ServerCalls() {
}
/**
* Creates a {@code ServerCallHandler} for a unary call method of the service.
*
* @param method an adaptor to the actual method on the service implementation.
*/
public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncUnaryCall(
final UnaryMethod<ReqT, RespT> method) {
return asyncUnaryRequestCall(method);
}
/**
* Creates a {@code ServerCallHandler} for a server streaming method of the service.
*
* @param method an adaptor to the actual method on the service implementation.
*/
public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncServerStreamingCall(
final ServerStreamingMethod<ReqT, RespT> method) {
return asyncUnaryRequestCall(method);
}
/**
* Creates a {@code ServerCallHandler} for a client streaming method of the service.
*
* @param method an adaptor to the actual method on the service implementation.
*/
public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncClientStreamingCall(
final ClientStreamingMethod<ReqT, RespT> method) {
return asyncStreamingRequestCall(method);
}
/**
* Creates a {@code ServerCallHandler} for a duplex streaming method of the service.
*
* @param method an adaptor to the actual method on the service implementation.
*/
public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncDuplexStreamingCall(
final DuplexStreamingMethod<ReqT, RespT> method) {
return asyncStreamingRequestCall(method);
}
/**
* Adaptor to a unary call method.
*/
public static interface UnaryMethod<ReqT, RespT> extends UnaryRequestMethod<ReqT, RespT> {
}
/**
* Adaptor to a server streaming method.
*/
public static interface ServerStreamingMethod<ReqT, RespT>
extends UnaryRequestMethod<ReqT, RespT> {
}
/**
* Adaptor to a client streaming method.
*/
public static interface ClientStreamingMethod<ReqT, RespT>
extends StreamingRequestMethod<ReqT, RespT> {
}
/**
* Adaptor to a bi-directional streaming method.
*/
public static interface DuplexStreamingMethod<ReqT, RespT>
extends StreamingRequestMethod<ReqT, RespT> {
}
/**
* Creates a {@code ServerCallHandler} for a unary request call method of the service.
*
* @param method an adaptor to the actual method on the service implementation.
*/
public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncUnaryRequestCall(
private static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncUnaryRequestCall(
final UnaryRequestMethod<ReqT, RespT> method) {
return new ServerCallHandler<ReqT, RespT>() {
@Override
public ServerCall.Listener<ReqT> startCall(
String fullMethodName, final ServerCall<RespT> call, Metadata.Headers headers) {
final ResponseObserver<RespT> responseObserver = new ResponseObserver<RespT>(call);
call.request(1);
// We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client
// sends more than 1 requests, we will catch it in onPayload() and emit INVALID_ARGUMENT.
call.request(2);
return new EmptyServerCallListener<ReqT>() {
ReqT request;
@Override
@ -66,9 +135,6 @@ public class ServerCalls {
// We delay calling method.invoke() until onHalfClose(), because application may call
// close(OK) inside invoke(), while close(OK) is not allowed before onHalfClose().
this.request = request;
// Request delivery of the next inbound message.
call.request(1);
} else {
call.close(
Status.INVALID_ARGUMENT.withDescription(
@ -101,7 +167,7 @@ public class ServerCalls {
*
* @param method an adaptor to the actual method on the service implementation.
*/
public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncStreamingRequestCall(
private static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncStreamingRequestCall(
final StreamingRequestMethod<ReqT, RespT> method) {
return new ServerCallHandler<ReqT, RespT>() {
@Override
@ -139,17 +205,11 @@ public class ServerCalls {
};
}
/**
* Adaptor to a unary call or server streaming method.
*/
public static interface UnaryRequestMethod<ReqT, RespT> {
private static interface UnaryRequestMethod<ReqT, RespT> {
void invoke(ReqT request, StreamObserver<RespT> responseObserver);
}
/**
* Adaptor to a client stremaing or bi-directional stremaing method.
*/
public static interface StreamingRequestMethod<ReqT, RespT> {
private static interface StreamingRequestMethod<ReqT, RespT> {
StreamObserver<ReqT> invoke(StreamObserver<RespT> responseObserver);
}