Optimize for flow-control: only request for 2 responses in unary-response cases

This commit is contained in:
Kun Zhang 2015-07-17 09:07:32 -07:00
parent d28535bb20
commit b80abef13f
1 changed files with 21 additions and 13 deletions

View File

@ -117,7 +117,7 @@ public class ClientCalls {
public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
ClientCall<ReqT, RespT> call, ReqT param) {
BlockingResponseStream<RespT> result = new BlockingResponseStream<RespT>(call);
asyncUnaryRequestCall(call, param, result.listener());
asyncUnaryRequestCall(call, param, result.listener(), true);
return result;
}
@ -130,7 +130,7 @@ public class ClientCalls {
ClientCall<ReqT, RespT> call,
ReqT param) {
GrpcFuture<RespT> responseFuture = new GrpcFuture<RespT>(call);
asyncUnaryRequestCall(call, param, new UnaryStreamToFuture<RespT>(responseFuture));
asyncUnaryRequestCall(call, param, new UnaryStreamToFuture<RespT>(responseFuture), false);
return responseFuture;
}
@ -159,17 +159,16 @@ public class ClientCalls {
ClientCall<ReqT, RespT> call, ReqT param, StreamObserver<RespT> responseObserver,
boolean streamingResponse) {
asyncUnaryRequestCall(call, param,
new StreamObserverToCallListenerAdapter<RespT>(call, responseObserver, streamingResponse));
new StreamObserverToCallListenerAdapter<RespT>(call, responseObserver, streamingResponse),
streamingResponse);
}
private static <ReqT, RespT> void asyncUnaryRequestCall(
ClientCall<ReqT, RespT> call,
ReqT param,
ClientCall.Listener<RespT> responseListener) {
call.start(responseListener, new Metadata.Headers());
// Initially ask for two responses from flow-control so that if a misbehaving server sends more
// than one responses, we can catch it.
call.request(2);
ClientCall.Listener<RespT> responseListener,
boolean streamingResponse) {
startCall(call, responseListener, streamingResponse);
try {
call.sendPayload(param);
call.halfClose();
@ -182,14 +181,23 @@ public class ClientCalls {
private static <ReqT, RespT> StreamObserver<ReqT> asyncStreamingRequestCall(
ClientCall<ReqT, RespT> call, StreamObserver<RespT> responseObserver,
boolean streamingResponse) {
call.start(new StreamObserverToCallListenerAdapter<RespT>(
call, responseObserver, streamingResponse), new Metadata.Headers());
// Initially ask for two responses from flow-control so that if a misbehaving server sends more
// than one responses, we can catch it.
call.request(2);
startCall(call, new StreamObserverToCallListenerAdapter<RespT>(
call, responseObserver, streamingResponse), streamingResponse);
return new CallToStreamObserverAdapter<ReqT>(call);
}
private static <ReqT, RespT> void startCall(ClientCall<ReqT, RespT> call,
ClientCall.Listener<RespT> responseListener, boolean streamingResponse) {
call.start(responseListener, new Metadata.Headers());
if (streamingResponse) {
call.request(1);
} else {
// Initially ask for two responses from flow-control so that if a misbehaving server sends
// more than one responses, we can catch it and fail it in the listener.
call.request(2);
}
}
private static class CallToStreamObserverAdapter<T> implements StreamObserver<T> {
private final ClientCall<T, ?> call;