Fix inbound flow control for Calls.blockingServerStreamingCall

asyncServerStreamingCall provides the initial request(1).

Fixes #93
This commit is contained in:
Eric Anderson 2015-02-18 16:17:56 -08:00
parent 161ac95300
commit 746eccaced
1 changed files with 11 additions and 3 deletions

View File

@ -45,10 +45,11 @@ import io.grpc.Status;
import java.util.Iterator; import java.util.Iterator;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -148,7 +149,7 @@ public class Calls {
// TODO(lryan): Not clear if we want to use this idiom for 'simple' stubs. // TODO(lryan): Not clear if we want to use this idiom for 'simple' stubs.
public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall( public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
Call<ReqT, RespT> call, ReqT param) { Call<ReqT, RespT> call, ReqT param) {
BlockingResponseStream<RespT> result = new BlockingResponseStream<RespT>(); BlockingResponseStream<RespT> result = new BlockingResponseStream<RespT>(call);
asyncServerStreamingCall(call, param, result.listener()); asyncServerStreamingCall(call, param, result.listener());
return result; return result;
} }
@ -331,11 +332,17 @@ public class Calls {
*/ */
// TODO(ejona): determine how to allow Call.cancel() in case of application error. // TODO(ejona): determine how to allow Call.cancel() in case of application error.
private static class BlockingResponseStream<T> implements Iterator<T> { private static class BlockingResponseStream<T> implements Iterator<T> {
private final LinkedBlockingQueue<Object> buffer = new LinkedBlockingQueue<Object>(); // Due to flow control, only needs to hold up to 2 items: 1 for value, 1 for close.
private final BlockingQueue<Object> buffer = new ArrayBlockingQueue<Object>(2);
private final Call.Listener<T> listener = new QueuingListener(); private final Call.Listener<T> listener = new QueuingListener();
private final Call<?, T> call;
// Only accessed when iterating. // Only accessed when iterating.
private Object last; private Object last;
private BlockingResponseStream(Call<?, T> call) {
this.call = call;
}
Call.Listener<T> listener() { Call.Listener<T> listener() {
return listener; return listener;
} }
@ -362,6 +369,7 @@ public class Calls {
throw new NoSuchElementException(); throw new NoSuchElementException();
} }
try { try {
call.request(1);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
T tmp = (T) last; T tmp = (T) last;
return tmp; return tmp;