diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java index f946b3bcc8..0266cb7d9a 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCalls.java +++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java @@ -577,8 +577,9 @@ public final class ClientCalls { */ // TODO(ejona86): determine how to allow ClientCall.cancel() in case of application error. private static final class BlockingResponseStream implements Iterator { - // Due to flow control, only needs to hold up to 2 items: 1 for value, 1 for close. - private final BlockingQueue buffer = new ArrayBlockingQueue<>(2); + // Due to flow control, only needs to hold up to 3 items: 2 for value, 1 for close. + // (2 for value, not 1, because of early request() in next()) + private final BlockingQueue buffer = new ArrayBlockingQueue<>(3); private final StartableListener listener = new QueuingListener(); private final ClientCall call; /** May be null. */ @@ -651,17 +652,20 @@ public final class ClientCalls { @Override public T next() { + // Eagerly call request(1) so it can be processing the next message while we wait for the + // current one, which reduces latency for the next message. With MigratingThreadDeframer and + // if the data has already been recieved, every other message can be delivered instantly. This + // can be run after hasNext(), but just would be slower. + if (!(last instanceof StatusRuntimeException) && last != this) { + call.request(1); + } if (!hasNext()) { throw new NoSuchElementException(); } - try { - call.request(1); - @SuppressWarnings("unchecked") - T tmp = (T) last; - return tmp; - } finally { - last = null; - } + @SuppressWarnings("unchecked") + T tmp = (T) last; + last = null; + return tmp; } @Override