mirror of https://github.com/grpc/grpc-java.git
stub: Reorder blocking request()s for message throughput
This can provide a ~2x performance increase to Netty and 40% increase for OkHttp. Netty async saw a ~3x gain from MigratingDeframer, so blocking trails behind a bit. But OkHttp's async gains from MigratingDeframer were also 40%, so this provides the same gain to blocking.
This commit is contained in:
parent
295d927d8b
commit
613439c97e
|
|
@ -577,8 +577,9 @@ public final class ClientCalls {
|
||||||
*/
|
*/
|
||||||
// TODO(ejona86): determine how to allow ClientCall.cancel() in case of application error.
|
// TODO(ejona86): determine how to allow ClientCall.cancel() in case of application error.
|
||||||
private static final class BlockingResponseStream<T> implements Iterator<T> {
|
private static final class BlockingResponseStream<T> implements Iterator<T> {
|
||||||
// Due to flow control, only needs to hold up to 2 items: 1 for value, 1 for close.
|
// Due to flow control, only needs to hold up to 3 items: 2 for value, 1 for close.
|
||||||
private final BlockingQueue<Object> buffer = new ArrayBlockingQueue<>(2);
|
// (2 for value, not 1, because of early request() in next())
|
||||||
|
private final BlockingQueue<Object> buffer = new ArrayBlockingQueue<>(3);
|
||||||
private final StartableListener<T> listener = new QueuingListener();
|
private final StartableListener<T> listener = new QueuingListener();
|
||||||
private final ClientCall<?, T> call;
|
private final ClientCall<?, T> call;
|
||||||
/** May be null. */
|
/** May be null. */
|
||||||
|
|
@ -651,17 +652,20 @@ public final class ClientCalls {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public T next() {
|
public T next() {
|
||||||
|
// Eagerly call request(1) so it can be processing the next message while we wait for the
|
||||||
|
// current one, which reduces latency for the next message. With MigratingThreadDeframer and
|
||||||
|
// if the data has already been recieved, every other message can be delivered instantly. This
|
||||||
|
// can be run after hasNext(), but just would be slower.
|
||||||
|
if (!(last instanceof StatusRuntimeException) && last != this) {
|
||||||
|
call.request(1);
|
||||||
|
}
|
||||||
if (!hasNext()) {
|
if (!hasNext()) {
|
||||||
throw new NoSuchElementException();
|
throw new NoSuchElementException();
|
||||||
}
|
}
|
||||||
try {
|
@SuppressWarnings("unchecked")
|
||||||
call.request(1);
|
T tmp = (T) last;
|
||||||
@SuppressWarnings("unchecked")
|
last = null;
|
||||||
T tmp = (T) last;
|
return tmp;
|
||||||
return tmp;
|
|
||||||
} finally {
|
|
||||||
last = null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue