From fc3e41674bfdc096d659e06485535e70c8aa349e Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Fri, 27 Mar 2015 17:12:55 -0700 Subject: [PATCH] Propagate explicit flushes through MessageFramer MessageFramer allows queing of data and explicit flushing. Sinks generally can benefit from knowing when they are required to flush, so we now tell them when MessageFramer received a flush so they only have to flush when required. --- .../grpc/transport/AbstractClientStream.java | 7 +++--- .../grpc/transport/AbstractServerStream.java | 7 +++--- .../io/grpc/transport/AbstractStream.java | 8 ++++--- .../java/io/grpc/transport/MessageFramer.java | 13 +++++----- .../io/grpc/transport/MessageFramerTest.java | 24 +++++++++---------- .../transport/netty/NettyClientStream.java | 7 ++++-- .../transport/netty/NettyServerStream.java | 7 ++++-- .../netty/NettyClientStreamTest.java | 3 ++- .../netty/NettyServerHandlerTest.java | 2 +- .../netty/NettyServerStreamTest.java | 3 ++- .../transport/okhttp/OkHttpClientStream.java | 4 ++-- .../okhttp/OutboundFlowController.java | 13 +++++++--- 12 files changed, 59 insertions(+), 39 deletions(-) diff --git a/core/src/main/java/io/grpc/transport/AbstractClientStream.java b/core/src/main/java/io/grpc/transport/AbstractClientStream.java index 5d7d3a8f1f..d896c1bb5f 100644 --- a/core/src/main/java/io/grpc/transport/AbstractClientStream.java +++ b/core/src/main/java/io/grpc/transport/AbstractClientStream.java @@ -183,8 +183,8 @@ public abstract class AbstractClientStream extends AbstractStream } @Override - protected final void internalSendFrame(WritableBuffer frame, boolean endOfStream) { - sendFrame(frame, endOfStream); + protected final void internalSendFrame(WritableBuffer frame, boolean endOfStream, boolean flush) { + sendFrame(frame, endOfStream, flush); } /** @@ -193,8 +193,9 @@ public abstract class AbstractClientStream extends AbstractStream * @param frame a buffer containing the chunk of data to be sent. * @param endOfStream if {@code true} indicates that no more data will be sent on the stream by * this endpoint. + * @param flush {@code true} if more data may not be arriving soon */ - protected abstract void sendFrame(WritableBuffer frame, boolean endOfStream); + protected abstract void sendFrame(WritableBuffer frame, boolean endOfStream, boolean flush); /** * Report stream closure with status to the application layer if not already reported. This method diff --git a/core/src/main/java/io/grpc/transport/AbstractServerStream.java b/core/src/main/java/io/grpc/transport/AbstractServerStream.java index 43b34ed4bd..eae5bd3351 100644 --- a/core/src/main/java/io/grpc/transport/AbstractServerStream.java +++ b/core/src/main/java/io/grpc/transport/AbstractServerStream.java @@ -141,9 +141,9 @@ public abstract class AbstractServerStream extends AbstractStream } @Override - protected final void internalSendFrame(WritableBuffer frame, boolean endOfStream) { + protected final void internalSendFrame(WritableBuffer frame, boolean endOfStream, boolean flush) { if (frame.readableBytes() > 0) { - sendFrame(frame, false); + sendFrame(frame, false, endOfStream ? false : flush); } if (endOfStream) { sendTrailers(stashedTrailers, headersSent); @@ -165,8 +165,9 @@ public abstract class AbstractServerStream extends AbstractStream * @param frame a buffer containing the chunk of data to be sent. * @param endOfStream if {@code true} indicates that no more data will be sent on the stream by * this endpoint. + * @param flush {@code true} if more data may not be arriving soon */ - protected abstract void sendFrame(WritableBuffer frame, boolean endOfStream); + protected abstract void sendFrame(WritableBuffer frame, boolean endOfStream, boolean flush); /** * Sends trailers to the remote end point. This call implies end of stream. diff --git a/core/src/main/java/io/grpc/transport/AbstractStream.java b/core/src/main/java/io/grpc/transport/AbstractStream.java index 9c21b5d1b3..abf5de3ae7 100644 --- a/core/src/main/java/io/grpc/transport/AbstractStream.java +++ b/core/src/main/java/io/grpc/transport/AbstractStream.java @@ -87,8 +87,8 @@ public abstract class AbstractStream implements Stream { }; MessageFramer.Sink outboundFrameHandler = new MessageFramer.Sink() { @Override - public void deliverFrame(WritableBuffer frame, boolean endOfStream) { - internalSendFrame(frame, endOfStream); + public void deliverFrame(WritableBuffer frame, boolean endOfStream, boolean flush) { + internalSendFrame(frame, endOfStream, flush); } }; @@ -155,8 +155,10 @@ public abstract class AbstractStream implements Stream { * @param frame a buffer containing the chunk of data to be sent. * @param endOfStream if {@code true} indicates that no more data will be sent on the stream by * this endpoint. + * @param flush {@code true} if more data may not be arriving soon */ - protected abstract void internalSendFrame(WritableBuffer frame, boolean endOfStream); + protected abstract void internalSendFrame(WritableBuffer frame, boolean endOfStream, + boolean flush); /** * Handles a message that was just deframed. diff --git a/core/src/main/java/io/grpc/transport/MessageFramer.java b/core/src/main/java/io/grpc/transport/MessageFramer.java index 31f27a69f4..1469f37103 100644 --- a/core/src/main/java/io/grpc/transport/MessageFramer.java +++ b/core/src/main/java/io/grpc/transport/MessageFramer.java @@ -61,8 +61,9 @@ public class MessageFramer { * * @param frame the contents of the frame to deliver * @param endOfStream whether the frame is the last one for the GRPC stream + * @param flush {@code true} if more data may not be arriving soon */ - void deliverFrame(WritableBuffer frame, boolean endOfStream); + void deliverFrame(WritableBuffer frame, boolean endOfStream, boolean flush); } private static final int HEADER_LENGTH = 5; @@ -178,7 +179,7 @@ public class MessageFramer { private void writeRaw(byte[] b, int off, int len) { while (len > 0) { if (buffer != null && buffer.writableBytes() == 0) { - commitToSink(false); + commitToSink(false, false); } if (buffer == null) { buffer = bufferAllocator.allocate(maxFrameSize); @@ -195,7 +196,7 @@ public class MessageFramer { */ public void flush() { if (buffer != null && buffer.readableBytes() > 0) { - commitToSink(false); + commitToSink(false, true); } } @@ -213,7 +214,7 @@ public class MessageFramer { */ public void close() { if (!isClosed()) { - commitToSink(true); + commitToSink(true, true); closed = true; } } @@ -230,11 +231,11 @@ public class MessageFramer { } } - private void commitToSink(boolean endOfStream) { + private void commitToSink(boolean endOfStream, boolean flush) { if (buffer == null) { buffer = bufferAllocator.allocate(0); } - sink.deliverFrame(buffer, endOfStream); + sink.deliverFrame(buffer, endOfStream, flush); buffer = null; } diff --git a/core/src/test/java/io/grpc/transport/MessageFramerTest.java b/core/src/test/java/io/grpc/transport/MessageFramerTest.java index f209e8dc53..1d1975e983 100644 --- a/core/src/test/java/io/grpc/transport/MessageFramerTest.java +++ b/core/src/test/java/io/grpc/transport/MessageFramerTest.java @@ -78,7 +78,7 @@ public class MessageFramerTest { writePayload(framer, new byte[] {3, 14}); verifyNoMoreInteractions(sink); framer.flush(); - verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false); + verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false, true); verifyNoMoreInteractions(sink); } @@ -90,7 +90,7 @@ public class MessageFramerTest { verifyNoMoreInteractions(sink); framer.flush(); verify(sink).deliverFrame( - toWriteBuffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 1, 14}), false); + toWriteBuffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 1, 14}), false, true); verifyNoMoreInteractions(sink); } @@ -100,14 +100,14 @@ public class MessageFramerTest { verifyNoMoreInteractions(sink); framer.close(); verify(sink).deliverFrame( - toWriteBuffer(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9, 2, 6}), true); + toWriteBuffer(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9, 2, 6}), true, true); verifyNoMoreInteractions(sink); } @Test public void closeWithoutBufferedFrameGivesEmptySink() { framer.close(); - verify(sink).deliverFrame(new ByteWritableBuffer(0), true); + verify(sink).deliverFrame(new ByteWritableBuffer(0), true, true); verifyNoMoreInteractions(sink); } @@ -115,11 +115,11 @@ public class MessageFramerTest { public void payloadSplitBetweenSinks() { writePayload(framer, new byte[] {3, 14, 1, 5, 9, 2, 6, 5}); verify(sink).deliverFrame( - toWriteBuffer(new byte[] {0, 0, 0, 0, 8, 3, 14, 1, 5, 9, 2, 6}), false); + toWriteBuffer(new byte[] {0, 0, 0, 0, 8, 3, 14, 1, 5, 9, 2, 6}), false, false); verifyNoMoreInteractions(sink); framer.flush(); - verify(sink).deliverFrame(toWriteBuffer(new byte[] {5}), false); + verify(sink).deliverFrame(toWriteBuffer(new byte[] {5}), false, true); verifyNoMoreInteractions(sink); } @@ -128,11 +128,11 @@ public class MessageFramerTest { writePayload(framer, new byte[] {3, 14, 1}); writePayload(framer, new byte[] {3}); verify(sink).deliverFrame( - toWriteBuffer(new byte[] {0, 0, 0, 0, 3, 3, 14, 1, 0, 0, 0, 0}), false); + toWriteBuffer(new byte[] {0, 0, 0, 0, 3, 3, 14, 1, 0, 0, 0, 0}), false, false); verifyNoMoreInteractions(sink); framer.flush(); - verify(sink).deliverFrame(toWriteBuffer(new byte[] {1, 3}), false); + verify(sink).deliverFrame(toWriteBuffer(new byte[] {1, 3}), false, true); verifyNoMoreInteractions(sink); } @@ -140,7 +140,7 @@ public class MessageFramerTest { public void emptyPayloadYieldsFrame() throws Exception { writePayload(framer, new byte[0]); framer.flush(); - verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false); + verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false, true); } @Test @@ -148,7 +148,7 @@ public class MessageFramerTest { writePayload(framer, new byte[] {3, 14}); framer.flush(); framer.flush(); - verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false); + verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false, true); verifyNoMoreInteractions(sink); } @@ -158,7 +158,7 @@ public class MessageFramerTest { MessageFramer framer = new MessageFramer(sink, allocator, transportFrameSize); writePayload(framer, new byte[1000]); framer.flush(); - verify(sink).deliverFrame(frameCaptor.capture(), eq(false)); + verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true)); ByteWritableBuffer buffer = frameCaptor.getValue(); assertEquals(1005, buffer.size()); @@ -177,7 +177,7 @@ public class MessageFramerTest { new MessageFramer(sink, allocator, transportFrameSize, Compression.GZIP); writePayload(framer, new byte[1000]); framer.flush(); - verify(sink).deliverFrame(frameCaptor.capture(), eq(false)); + verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true)); ByteWritableBuffer buffer = frameCaptor.getValue(); // It should have compressed very well. assertTrue(buffer.size() < 100); diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyClientStream.java b/netty/src/main/java/io/grpc/transport/netty/NettyClientStream.java index 639fb4559e..bbc9bcc4a8 100644 --- a/netty/src/main/java/io/grpc/transport/netty/NettyClientStream.java +++ b/netty/src/main/java/io/grpc/transport/netty/NettyClientStream.java @@ -115,9 +115,12 @@ class NettyClientStream extends Http2ClientStream { } @Override - protected void sendFrame(WritableBuffer frame, boolean endOfStream) { + protected void sendFrame(WritableBuffer frame, boolean endOfStream, boolean flush) { ByteBuf bytebuf = ((NettyWritableBuffer) frame).bytebuf(); - channel.writeAndFlush(new SendGrpcFrameCommand(this, bytebuf, endOfStream)); + channel.write(new SendGrpcFrameCommand(this, bytebuf, endOfStream)); + if (flush) { + channel.flush(); + } } @Override diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyServerStream.java b/netty/src/main/java/io/grpc/transport/netty/NettyServerStream.java index d81da6e021..6e52d78038 100644 --- a/netty/src/main/java/io/grpc/transport/netty/NettyServerStream.java +++ b/netty/src/main/java/io/grpc/transport/netty/NettyServerStream.java @@ -88,9 +88,12 @@ class NettyServerStream extends AbstractServerStream { } @Override - protected void sendFrame(WritableBuffer frame, boolean endOfStream) { + protected void sendFrame(WritableBuffer frame, boolean endOfStream, boolean flush) { ByteBuf bytebuf = ((NettyWritableBuffer) frame).bytebuf(); - channel.writeAndFlush(new SendGrpcFrameCommand(this, bytebuf, endOfStream)); + channel.write(new SendGrpcFrameCommand(this, bytebuf, endOfStream)); + if (flush) { + channel.flush(); + } } @Override diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java b/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java index bb261e1c32..3a7c45c4e7 100644 --- a/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java +++ b/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java @@ -108,7 +108,8 @@ public class NettyClientStreamTest extends NettyStreamTestBase { stream().id(STREAM_ID); stream.writeMessage(input, input.available(), accepted); stream.flush(); - verify(channel).writeAndFlush(new SendGrpcFrameCommand(stream, messageFrame(MESSAGE), false)); + verify(channel).write(new SendGrpcFrameCommand(stream, messageFrame(MESSAGE), false)); + verify(channel).flush(); verify(accepted).run(); } diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/transport/netty/NettyServerHandlerTest.java index 5734158056..59dec89172 100644 --- a/netty/src/test/java/io/grpc/transport/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/transport/netty/NettyServerHandlerTest.java @@ -314,7 +314,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase { final ByteBuf compressionFrame = Unpooled.buffer(CONTENT.length); MessageFramer framer = new MessageFramer(new MessageFramer.Sink() { @Override - public void deliverFrame(WritableBuffer frame, boolean endOfStream) { + public void deliverFrame(WritableBuffer frame, boolean endOfStream, boolean flush) { ByteBuf bytebuf = ((NettyWritableBuffer)frame).bytebuf(); compressionFrame.writeBytes(bytebuf); } diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyServerStreamTest.java b/netty/src/test/java/io/grpc/transport/netty/NettyServerStreamTest.java index 831c045c7f..1edb0f0ea6 100644 --- a/netty/src/test/java/io/grpc/transport/netty/NettyServerStreamTest.java +++ b/netty/src/test/java/io/grpc/transport/netty/NettyServerStreamTest.java @@ -85,7 +85,8 @@ public class NettyServerStreamTest extends NettyStreamTestBase { .status(Utils.STATUS_OK) .set(Utils.CONTENT_TYPE_HEADER, Utils.CONTENT_TYPE_GRPC); verify(channel).writeAndFlush(new SendResponseHeadersCommand(STREAM_ID, headers, false)); - verify(channel).writeAndFlush(new SendGrpcFrameCommand(stream, messageFrame(MESSAGE), false)); + verify(channel).write(new SendGrpcFrameCommand(stream, messageFrame(MESSAGE), false)); + verify(channel).flush(); verify(accepted).run(); } diff --git a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientStream.java index 456870953e..a2954ff8e2 100644 --- a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientStream.java +++ b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientStream.java @@ -143,14 +143,14 @@ class OkHttpClientStream extends Http2ClientStream { } @Override - protected void sendFrame(WritableBuffer frame, boolean endOfStream) { + protected void sendFrame(WritableBuffer frame, boolean endOfStream, boolean flush) { checkState(id() != 0, "streamId should be set"); Buffer buffer = ((OkHttpWritableBuffer) frame).buffer(); // Write the data to the remote endpoint. // Per http2 SPEC, the max data length should be larger than 64K, while our frame size is // only 4K. checkState(buffer.size() < frameWriter.maxDataLength()); - outboundFlow.data(endOfStream, id(), buffer); + outboundFlow.data(endOfStream, id(), buffer, flush); } @Override diff --git a/okhttp/src/main/java/io/grpc/transport/okhttp/OutboundFlowController.java b/okhttp/src/main/java/io/grpc/transport/okhttp/OutboundFlowController.java index 2603472ecb..8d360804e0 100644 --- a/okhttp/src/main/java/io/grpc/transport/okhttp/OutboundFlowController.java +++ b/okhttp/src/main/java/io/grpc/transport/okhttp/OutboundFlowController.java @@ -101,7 +101,7 @@ class OutboundFlowController { } } - synchronized void data(boolean outFinished, int streamId, Buffer source) { + synchronized void data(boolean outFinished, int streamId, Buffer source, boolean flush) { Preconditions.checkNotNull(source, "source"); if (streamId <= 0) { throw new IllegalArgumentException("streamId must be > 0"); @@ -115,7 +115,9 @@ class OutboundFlowController { if (!framesAlreadyQueued && window >= frame.size()) { // Window size is large enough to send entire data frame frame.write(); - flush(); + if (flush) { + flush(); + } return; } @@ -124,12 +126,17 @@ class OutboundFlowController { if (framesAlreadyQueued || window <= 0) { // Stream already has frames pending or is stalled, don't send anything now. + if (flush) { + flush(); + } return; } // Create and send a partial frame up to the window size. frame.split(window).write(); - flush(); + if (flush) { + flush(); + } } private void flush() {