diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java index 6c957ba165..be484d18c5 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCalls.java +++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java @@ -117,7 +117,7 @@ public class ClientCalls { public static Iterator blockingServerStreamingCall( ClientCall call, ReqT param) { BlockingResponseStream result = new BlockingResponseStream(call); - asyncUnaryRequestCall(call, param, result.listener()); + asyncUnaryRequestCall(call, param, result.listener(), true); return result; } @@ -130,7 +130,7 @@ public class ClientCalls { ClientCall call, ReqT param) { GrpcFuture responseFuture = new GrpcFuture(call); - asyncUnaryRequestCall(call, param, new UnaryStreamToFuture(responseFuture)); + asyncUnaryRequestCall(call, param, new UnaryStreamToFuture(responseFuture), false); return responseFuture; } @@ -159,17 +159,16 @@ public class ClientCalls { ClientCall call, ReqT param, StreamObserver responseObserver, boolean streamingResponse) { asyncUnaryRequestCall(call, param, - new StreamObserverToCallListenerAdapter(call, responseObserver, streamingResponse)); + new StreamObserverToCallListenerAdapter(call, responseObserver, streamingResponse), + streamingResponse); } private static void asyncUnaryRequestCall( ClientCall call, ReqT param, - ClientCall.Listener responseListener) { - call.start(responseListener, new Metadata.Headers()); - // Initially ask for two responses from flow-control so that if a misbehaving server sends more - // than one responses, we can catch it. - call.request(2); + ClientCall.Listener responseListener, + boolean streamingResponse) { + startCall(call, responseListener, streamingResponse); try { call.sendPayload(param); call.halfClose(); @@ -182,14 +181,23 @@ public class ClientCalls { private static StreamObserver asyncStreamingRequestCall( ClientCall call, StreamObserver responseObserver, boolean streamingResponse) { - call.start(new StreamObserverToCallListenerAdapter( - call, responseObserver, streamingResponse), new Metadata.Headers()); - // Initially ask for two responses from flow-control so that if a misbehaving server sends more - // than one responses, we can catch it. - call.request(2); + startCall(call, new StreamObserverToCallListenerAdapter( + call, responseObserver, streamingResponse), streamingResponse); return new CallToStreamObserverAdapter(call); } + private static void startCall(ClientCall call, + ClientCall.Listener responseListener, boolean streamingResponse) { + call.start(responseListener, new Metadata.Headers()); + if (streamingResponse) { + call.request(1); + } else { + // Initially ask for two responses from flow-control so that if a misbehaving server sends + // more than one responses, we can catch it and fail it in the listener. + call.request(2); + } + } + private static class CallToStreamObserverAdapter implements StreamObserver { private final ClientCall call;