diff --git a/stub/src/main/java/io/grpc/stub/Calls.java b/stub/src/main/java/io/grpc/stub/Calls.java index 2459bb6d8e..71a3ae758c 100644 --- a/stub/src/main/java/io/grpc/stub/Calls.java +++ b/stub/src/main/java/io/grpc/stub/Calls.java @@ -45,10 +45,11 @@ import io.grpc.Status; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; /** @@ -148,7 +149,7 @@ public class Calls { // TODO(lryan): Not clear if we want to use this idiom for 'simple' stubs. public static Iterator blockingServerStreamingCall( Call call, ReqT param) { - BlockingResponseStream result = new BlockingResponseStream(); + BlockingResponseStream result = new BlockingResponseStream(call); asyncServerStreamingCall(call, param, result.listener()); return result; } @@ -331,11 +332,17 @@ public class Calls { */ // TODO(ejona): determine how to allow Call.cancel() in case of application error. private static class BlockingResponseStream implements Iterator { - private final LinkedBlockingQueue buffer = new LinkedBlockingQueue(); + // Due to flow control, only needs to hold up to 2 items: 1 for value, 1 for close. + private final BlockingQueue buffer = new ArrayBlockingQueue(2); private final Call.Listener listener = new QueuingListener(); + private final Call call; // Only accessed when iterating. private Object last; + private BlockingResponseStream(Call call) { + this.call = call; + } + Call.Listener listener() { return listener; } @@ -362,6 +369,7 @@ public class Calls { throw new NoSuchElementException(); } try { + call.request(1); @SuppressWarnings("unchecked") T tmp = (T) last; return tmp;