diff --git a/core/src/main/java/io/grpc/transport/AbstractClientStream.java b/core/src/main/java/io/grpc/transport/AbstractClientStream.java index 5d7d3a8f1f..d896c1bb5f 100644 --- a/core/src/main/java/io/grpc/transport/AbstractClientStream.java +++ b/core/src/main/java/io/grpc/transport/AbstractClientStream.java @@ -183,8 +183,8 @@ public abstract class AbstractClientStream extends AbstractStream } @Override - protected final void internalSendFrame(WritableBuffer frame, boolean endOfStream) { - sendFrame(frame, endOfStream); + protected final void internalSendFrame(WritableBuffer frame, boolean endOfStream, boolean flush) { + sendFrame(frame, endOfStream, flush); } /** @@ -193,8 +193,9 @@ public abstract class AbstractClientStream extends AbstractStream * @param frame a buffer containing the chunk of data to be sent. * @param endOfStream if {@code true} indicates that no more data will be sent on the stream by * this endpoint. + * @param flush {@code true} if more data may not be arriving soon */ - protected abstract void sendFrame(WritableBuffer frame, boolean endOfStream); + protected abstract void sendFrame(WritableBuffer frame, boolean endOfStream, boolean flush); /** * Report stream closure with status to the application layer if not already reported. This method diff --git a/core/src/main/java/io/grpc/transport/AbstractServerStream.java b/core/src/main/java/io/grpc/transport/AbstractServerStream.java index 43b34ed4bd..eae5bd3351 100644 --- a/core/src/main/java/io/grpc/transport/AbstractServerStream.java +++ b/core/src/main/java/io/grpc/transport/AbstractServerStream.java @@ -141,9 +141,9 @@ public abstract class AbstractServerStream extends AbstractStream } @Override - protected final void internalSendFrame(WritableBuffer frame, boolean endOfStream) { + protected final void internalSendFrame(WritableBuffer frame, boolean endOfStream, boolean flush) { if (frame.readableBytes() > 0) { - sendFrame(frame, false); + sendFrame(frame, false, endOfStream ? false : flush); } if (endOfStream) { sendTrailers(stashedTrailers, headersSent); @@ -165,8 +165,9 @@ public abstract class AbstractServerStream extends AbstractStream * @param frame a buffer containing the chunk of data to be sent. * @param endOfStream if {@code true} indicates that no more data will be sent on the stream by * this endpoint. + * @param flush {@code true} if more data may not be arriving soon */ - protected abstract void sendFrame(WritableBuffer frame, boolean endOfStream); + protected abstract void sendFrame(WritableBuffer frame, boolean endOfStream, boolean flush); /** * Sends trailers to the remote end point. This call implies end of stream. diff --git a/core/src/main/java/io/grpc/transport/AbstractStream.java b/core/src/main/java/io/grpc/transport/AbstractStream.java index 9c21b5d1b3..abf5de3ae7 100644 --- a/core/src/main/java/io/grpc/transport/AbstractStream.java +++ b/core/src/main/java/io/grpc/transport/AbstractStream.java @@ -87,8 +87,8 @@ public abstract class AbstractStream implements Stream { }; MessageFramer.Sink outboundFrameHandler = new MessageFramer.Sink() { @Override - public void deliverFrame(WritableBuffer frame, boolean endOfStream) { - internalSendFrame(frame, endOfStream); + public void deliverFrame(WritableBuffer frame, boolean endOfStream, boolean flush) { + internalSendFrame(frame, endOfStream, flush); } }; @@ -155,8 +155,10 @@ public abstract class AbstractStream implements Stream { * @param frame a buffer containing the chunk of data to be sent. * @param endOfStream if {@code true} indicates that no more data will be sent on the stream by * this endpoint. + * @param flush {@code true} if more data may not be arriving soon */ - protected abstract void internalSendFrame(WritableBuffer frame, boolean endOfStream); + protected abstract void internalSendFrame(WritableBuffer frame, boolean endOfStream, + boolean flush); /** * Handles a message that was just deframed. diff --git a/core/src/main/java/io/grpc/transport/MessageFramer.java b/core/src/main/java/io/grpc/transport/MessageFramer.java index 31f27a69f4..1469f37103 100644 --- a/core/src/main/java/io/grpc/transport/MessageFramer.java +++ b/core/src/main/java/io/grpc/transport/MessageFramer.java @@ -61,8 +61,9 @@ public class MessageFramer { * * @param frame the contents of the frame to deliver * @param endOfStream whether the frame is the last one for the GRPC stream + * @param flush {@code true} if more data may not be arriving soon */ - void deliverFrame(WritableBuffer frame, boolean endOfStream); + void deliverFrame(WritableBuffer frame, boolean endOfStream, boolean flush); } private static final int HEADER_LENGTH = 5; @@ -178,7 +179,7 @@ public class MessageFramer { private void writeRaw(byte[] b, int off, int len) { while (len > 0) { if (buffer != null && buffer.writableBytes() == 0) { - commitToSink(false); + commitToSink(false, false); } if (buffer == null) { buffer = bufferAllocator.allocate(maxFrameSize); @@ -195,7 +196,7 @@ public class MessageFramer { */ public void flush() { if (buffer != null && buffer.readableBytes() > 0) { - commitToSink(false); + commitToSink(false, true); } } @@ -213,7 +214,7 @@ public class MessageFramer { */ public void close() { if (!isClosed()) { - commitToSink(true); + commitToSink(true, true); closed = true; } } @@ -230,11 +231,11 @@ public class MessageFramer { } } - private void commitToSink(boolean endOfStream) { + private void commitToSink(boolean endOfStream, boolean flush) { if (buffer == null) { buffer = bufferAllocator.allocate(0); } - sink.deliverFrame(buffer, endOfStream); + sink.deliverFrame(buffer, endOfStream, flush); buffer = null; } diff --git a/core/src/test/java/io/grpc/transport/MessageFramerTest.java b/core/src/test/java/io/grpc/transport/MessageFramerTest.java index f209e8dc53..1d1975e983 100644 --- a/core/src/test/java/io/grpc/transport/MessageFramerTest.java +++ b/core/src/test/java/io/grpc/transport/MessageFramerTest.java @@ -78,7 +78,7 @@ public class MessageFramerTest { writePayload(framer, new byte[] {3, 14}); verifyNoMoreInteractions(sink); framer.flush(); - verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false); + verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false, true); verifyNoMoreInteractions(sink); } @@ -90,7 +90,7 @@ public class MessageFramerTest { verifyNoMoreInteractions(sink); framer.flush(); verify(sink).deliverFrame( - toWriteBuffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 1, 14}), false); + toWriteBuffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 1, 14}), false, true); verifyNoMoreInteractions(sink); } @@ -100,14 +100,14 @@ public class MessageFramerTest { verifyNoMoreInteractions(sink); framer.close(); verify(sink).deliverFrame( - toWriteBuffer(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9, 2, 6}), true); + toWriteBuffer(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9, 2, 6}), true, true); verifyNoMoreInteractions(sink); } @Test public void closeWithoutBufferedFrameGivesEmptySink() { framer.close(); - verify(sink).deliverFrame(new ByteWritableBuffer(0), true); + verify(sink).deliverFrame(new ByteWritableBuffer(0), true, true); verifyNoMoreInteractions(sink); } @@ -115,11 +115,11 @@ public class MessageFramerTest { public void payloadSplitBetweenSinks() { writePayload(framer, new byte[] {3, 14, 1, 5, 9, 2, 6, 5}); verify(sink).deliverFrame( - toWriteBuffer(new byte[] {0, 0, 0, 0, 8, 3, 14, 1, 5, 9, 2, 6}), false); + toWriteBuffer(new byte[] {0, 0, 0, 0, 8, 3, 14, 1, 5, 9, 2, 6}), false, false); verifyNoMoreInteractions(sink); framer.flush(); - verify(sink).deliverFrame(toWriteBuffer(new byte[] {5}), false); + verify(sink).deliverFrame(toWriteBuffer(new byte[] {5}), false, true); verifyNoMoreInteractions(sink); } @@ -128,11 +128,11 @@ public class MessageFramerTest { writePayload(framer, new byte[] {3, 14, 1}); writePayload(framer, new byte[] {3}); verify(sink).deliverFrame( - toWriteBuffer(new byte[] {0, 0, 0, 0, 3, 3, 14, 1, 0, 0, 0, 0}), false); + toWriteBuffer(new byte[] {0, 0, 0, 0, 3, 3, 14, 1, 0, 0, 0, 0}), false, false); verifyNoMoreInteractions(sink); framer.flush(); - verify(sink).deliverFrame(toWriteBuffer(new byte[] {1, 3}), false); + verify(sink).deliverFrame(toWriteBuffer(new byte[] {1, 3}), false, true); verifyNoMoreInteractions(sink); } @@ -140,7 +140,7 @@ public class MessageFramerTest { public void emptyPayloadYieldsFrame() throws Exception { writePayload(framer, new byte[0]); framer.flush(); - verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false); + verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false, true); } @Test @@ -148,7 +148,7 @@ public class MessageFramerTest { writePayload(framer, new byte[] {3, 14}); framer.flush(); framer.flush(); - verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false); + verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false, true); verifyNoMoreInteractions(sink); } @@ -158,7 +158,7 @@ public class MessageFramerTest { MessageFramer framer = new MessageFramer(sink, allocator, transportFrameSize); writePayload(framer, new byte[1000]); framer.flush(); - verify(sink).deliverFrame(frameCaptor.capture(), eq(false)); + verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true)); ByteWritableBuffer buffer = frameCaptor.getValue(); assertEquals(1005, buffer.size()); @@ -177,7 +177,7 @@ public class MessageFramerTest { new MessageFramer(sink, allocator, transportFrameSize, Compression.GZIP); writePayload(framer, new byte[1000]); framer.flush(); - verify(sink).deliverFrame(frameCaptor.capture(), eq(false)); + verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true)); ByteWritableBuffer buffer = frameCaptor.getValue(); // It should have compressed very well. assertTrue(buffer.size() < 100); diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyClientStream.java b/netty/src/main/java/io/grpc/transport/netty/NettyClientStream.java index 639fb4559e..bbc9bcc4a8 100644 --- a/netty/src/main/java/io/grpc/transport/netty/NettyClientStream.java +++ b/netty/src/main/java/io/grpc/transport/netty/NettyClientStream.java @@ -115,9 +115,12 @@ class NettyClientStream extends Http2ClientStream { } @Override - protected void sendFrame(WritableBuffer frame, boolean endOfStream) { + protected void sendFrame(WritableBuffer frame, boolean endOfStream, boolean flush) { ByteBuf bytebuf = ((NettyWritableBuffer) frame).bytebuf(); - channel.writeAndFlush(new SendGrpcFrameCommand(this, bytebuf, endOfStream)); + channel.write(new SendGrpcFrameCommand(this, bytebuf, endOfStream)); + if (flush) { + channel.flush(); + } } @Override diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyServerStream.java b/netty/src/main/java/io/grpc/transport/netty/NettyServerStream.java index d81da6e021..6e52d78038 100644 --- a/netty/src/main/java/io/grpc/transport/netty/NettyServerStream.java +++ b/netty/src/main/java/io/grpc/transport/netty/NettyServerStream.java @@ -88,9 +88,12 @@ class NettyServerStream extends AbstractServerStream { } @Override - protected void sendFrame(WritableBuffer frame, boolean endOfStream) { + protected void sendFrame(WritableBuffer frame, boolean endOfStream, boolean flush) { ByteBuf bytebuf = ((NettyWritableBuffer) frame).bytebuf(); - channel.writeAndFlush(new SendGrpcFrameCommand(this, bytebuf, endOfStream)); + channel.write(new SendGrpcFrameCommand(this, bytebuf, endOfStream)); + if (flush) { + channel.flush(); + } } @Override diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java b/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java index bb261e1c32..3a7c45c4e7 100644 --- a/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java +++ b/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java @@ -108,7 +108,8 @@ public class NettyClientStreamTest extends NettyStreamTestBase { stream().id(STREAM_ID); stream.writeMessage(input, input.available(), accepted); stream.flush(); - verify(channel).writeAndFlush(new SendGrpcFrameCommand(stream, messageFrame(MESSAGE), false)); + verify(channel).write(new SendGrpcFrameCommand(stream, messageFrame(MESSAGE), false)); + verify(channel).flush(); verify(accepted).run(); } diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/transport/netty/NettyServerHandlerTest.java index 5734158056..59dec89172 100644 --- a/netty/src/test/java/io/grpc/transport/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/transport/netty/NettyServerHandlerTest.java @@ -314,7 +314,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase { final ByteBuf compressionFrame = Unpooled.buffer(CONTENT.length); MessageFramer framer = new MessageFramer(new MessageFramer.Sink() { @Override - public void deliverFrame(WritableBuffer frame, boolean endOfStream) { + public void deliverFrame(WritableBuffer frame, boolean endOfStream, boolean flush) { ByteBuf bytebuf = ((NettyWritableBuffer)frame).bytebuf(); compressionFrame.writeBytes(bytebuf); } diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyServerStreamTest.java b/netty/src/test/java/io/grpc/transport/netty/NettyServerStreamTest.java index 831c045c7f..1edb0f0ea6 100644 --- a/netty/src/test/java/io/grpc/transport/netty/NettyServerStreamTest.java +++ b/netty/src/test/java/io/grpc/transport/netty/NettyServerStreamTest.java @@ -85,7 +85,8 @@ public class NettyServerStreamTest extends NettyStreamTestBase { .status(Utils.STATUS_OK) .set(Utils.CONTENT_TYPE_HEADER, Utils.CONTENT_TYPE_GRPC); verify(channel).writeAndFlush(new SendResponseHeadersCommand(STREAM_ID, headers, false)); - verify(channel).writeAndFlush(new SendGrpcFrameCommand(stream, messageFrame(MESSAGE), false)); + verify(channel).write(new SendGrpcFrameCommand(stream, messageFrame(MESSAGE), false)); + verify(channel).flush(); verify(accepted).run(); } diff --git a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientStream.java index 456870953e..a2954ff8e2 100644 --- a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientStream.java +++ b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientStream.java @@ -143,14 +143,14 @@ class OkHttpClientStream extends Http2ClientStream { } @Override - protected void sendFrame(WritableBuffer frame, boolean endOfStream) { + protected void sendFrame(WritableBuffer frame, boolean endOfStream, boolean flush) { checkState(id() != 0, "streamId should be set"); Buffer buffer = ((OkHttpWritableBuffer) frame).buffer(); // Write the data to the remote endpoint. // Per http2 SPEC, the max data length should be larger than 64K, while our frame size is // only 4K. checkState(buffer.size() < frameWriter.maxDataLength()); - outboundFlow.data(endOfStream, id(), buffer); + outboundFlow.data(endOfStream, id(), buffer, flush); } @Override diff --git a/okhttp/src/main/java/io/grpc/transport/okhttp/OutboundFlowController.java b/okhttp/src/main/java/io/grpc/transport/okhttp/OutboundFlowController.java index 2603472ecb..8d360804e0 100644 --- a/okhttp/src/main/java/io/grpc/transport/okhttp/OutboundFlowController.java +++ b/okhttp/src/main/java/io/grpc/transport/okhttp/OutboundFlowController.java @@ -101,7 +101,7 @@ class OutboundFlowController { } } - synchronized void data(boolean outFinished, int streamId, Buffer source) { + synchronized void data(boolean outFinished, int streamId, Buffer source, boolean flush) { Preconditions.checkNotNull(source, "source"); if (streamId <= 0) { throw new IllegalArgumentException("streamId must be > 0"); @@ -115,7 +115,9 @@ class OutboundFlowController { if (!framesAlreadyQueued && window >= frame.size()) { // Window size is large enough to send entire data frame frame.write(); - flush(); + if (flush) { + flush(); + } return; } @@ -124,12 +126,17 @@ class OutboundFlowController { if (framesAlreadyQueued || window <= 0) { // Stream already has frames pending or is stalled, don't send anything now. + if (flush) { + flush(); + } return; } // Create and send a partial frame up to the window size. frame.split(window).write(); - flush(); + if (flush) { + flush(); + } } private void flush() {