netty: always flush at least once, even if there were no writes

This commit is contained in:
Carl Mastrangelo 2016-07-19 10:49:54 -07:00
parent 083ea8e6aa
commit 297af4425e
3 changed files with 5 additions and 4 deletions

View File

@ -121,6 +121,7 @@ class WriteQueue {
try { try {
QueuedCommand cmd; QueuedCommand cmd;
int i = 0; int i = 0;
boolean flushedOnce = false;
while ((cmd = queue.poll()) != null) { while ((cmd = queue.poll()) != null) {
channel.write(cmd, cmd.promise()); channel.write(cmd, cmd.promise());
if (++i == DEQUE_CHUNK_SIZE) { if (++i == DEQUE_CHUNK_SIZE) {
@ -129,10 +130,11 @@ class WriteQueue {
// might never end as new events are continuously added to the queue, if we never // might never end as new events are continuously added to the queue, if we never
// flushed in that case we would be guaranteed to OOM. // flushed in that case we would be guaranteed to OOM.
channel.flush(); channel.flush();
flushedOnce = true;
} }
} }
// Must flush at least once // Must flush at least once, even if there were no writes.
if (i != 0) { if (i != 0 || !flushedOnce) {
channel.flush(); channel.flush();
} }
} finally { } finally {

View File

@ -71,7 +71,7 @@ public class WriteQueueTest {
public ChannelPromise promise; public ChannelPromise promise;
private long writeCalledNanos; private long writeCalledNanos;
private long flushCalledNanos = writeCalledNanos + 1; private long flushCalledNanos = writeCalledNanos;
/** /**
* Set up for test. * Set up for test.

View File

@ -40,7 +40,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.CallOptions; import io.grpc.CallOptions;
import io.grpc.Channel; import io.grpc.Channel;
import io.grpc.ClientCall; import io.grpc.ClientCall;
import io.grpc.ExperimentalApi;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
import io.grpc.Status; import io.grpc.Status;