Propagate explicit flushes through MessageFramer

MessageFramer allows queing of data and explicit flushing. Sinks
generally can benefit from knowing when they are required to flush, so
we now tell them when MessageFramer received a flush so they only have
to flush when required.
This commit is contained in:
Eric Anderson 2015-03-27 17:12:55 -07:00
parent af18876713
commit fc3e41674b
12 changed files with 59 additions and 39 deletions

View File

@ -183,8 +183,8 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
}
@Override
protected final void internalSendFrame(WritableBuffer frame, boolean endOfStream) {
sendFrame(frame, endOfStream);
protected final void internalSendFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {
sendFrame(frame, endOfStream, flush);
}
/**
@ -193,8 +193,9 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
* @param frame a buffer containing the chunk of data to be sent.
* @param endOfStream if {@code true} indicates that no more data will be sent on the stream by
* this endpoint.
* @param flush {@code true} if more data may not be arriving soon
*/
protected abstract void sendFrame(WritableBuffer frame, boolean endOfStream);
protected abstract void sendFrame(WritableBuffer frame, boolean endOfStream, boolean flush);
/**
* Report stream closure with status to the application layer if not already reported. This method

View File

@ -141,9 +141,9 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
}
@Override
protected final void internalSendFrame(WritableBuffer frame, boolean endOfStream) {
protected final void internalSendFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {
if (frame.readableBytes() > 0) {
sendFrame(frame, false);
sendFrame(frame, false, endOfStream ? false : flush);
}
if (endOfStream) {
sendTrailers(stashedTrailers, headersSent);
@ -165,8 +165,9 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
* @param frame a buffer containing the chunk of data to be sent.
* @param endOfStream if {@code true} indicates that no more data will be sent on the stream by
* this endpoint.
* @param flush {@code true} if more data may not be arriving soon
*/
protected abstract void sendFrame(WritableBuffer frame, boolean endOfStream);
protected abstract void sendFrame(WritableBuffer frame, boolean endOfStream, boolean flush);
/**
* Sends trailers to the remote end point. This call implies end of stream.

View File

@ -87,8 +87,8 @@ public abstract class AbstractStream<IdT> implements Stream {
};
MessageFramer.Sink outboundFrameHandler = new MessageFramer.Sink() {
@Override
public void deliverFrame(WritableBuffer frame, boolean endOfStream) {
internalSendFrame(frame, endOfStream);
public void deliverFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {
internalSendFrame(frame, endOfStream, flush);
}
};
@ -155,8 +155,10 @@ public abstract class AbstractStream<IdT> implements Stream {
* @param frame a buffer containing the chunk of data to be sent.
* @param endOfStream if {@code true} indicates that no more data will be sent on the stream by
* this endpoint.
* @param flush {@code true} if more data may not be arriving soon
*/
protected abstract void internalSendFrame(WritableBuffer frame, boolean endOfStream);
protected abstract void internalSendFrame(WritableBuffer frame, boolean endOfStream,
boolean flush);
/**
* Handles a message that was just deframed.

View File

@ -61,8 +61,9 @@ public class MessageFramer {
*
* @param frame the contents of the frame to deliver
* @param endOfStream whether the frame is the last one for the GRPC stream
* @param flush {@code true} if more data may not be arriving soon
*/
void deliverFrame(WritableBuffer frame, boolean endOfStream);
void deliverFrame(WritableBuffer frame, boolean endOfStream, boolean flush);
}
private static final int HEADER_LENGTH = 5;
@ -178,7 +179,7 @@ public class MessageFramer {
private void writeRaw(byte[] b, int off, int len) {
while (len > 0) {
if (buffer != null && buffer.writableBytes() == 0) {
commitToSink(false);
commitToSink(false, false);
}
if (buffer == null) {
buffer = bufferAllocator.allocate(maxFrameSize);
@ -195,7 +196,7 @@ public class MessageFramer {
*/
public void flush() {
if (buffer != null && buffer.readableBytes() > 0) {
commitToSink(false);
commitToSink(false, true);
}
}
@ -213,7 +214,7 @@ public class MessageFramer {
*/
public void close() {
if (!isClosed()) {
commitToSink(true);
commitToSink(true, true);
closed = true;
}
}
@ -230,11 +231,11 @@ public class MessageFramer {
}
}
private void commitToSink(boolean endOfStream) {
private void commitToSink(boolean endOfStream, boolean flush) {
if (buffer == null) {
buffer = bufferAllocator.allocate(0);
}
sink.deliverFrame(buffer, endOfStream);
sink.deliverFrame(buffer, endOfStream, flush);
buffer = null;
}

View File

@ -78,7 +78,7 @@ public class MessageFramerTest {
writePayload(framer, new byte[] {3, 14});
verifyNoMoreInteractions(sink);
framer.flush();
verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false);
verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false, true);
verifyNoMoreInteractions(sink);
}
@ -90,7 +90,7 @@ public class MessageFramerTest {
verifyNoMoreInteractions(sink);
framer.flush();
verify(sink).deliverFrame(
toWriteBuffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 1, 14}), false);
toWriteBuffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 1, 14}), false, true);
verifyNoMoreInteractions(sink);
}
@ -100,14 +100,14 @@ public class MessageFramerTest {
verifyNoMoreInteractions(sink);
framer.close();
verify(sink).deliverFrame(
toWriteBuffer(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9, 2, 6}), true);
toWriteBuffer(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9, 2, 6}), true, true);
verifyNoMoreInteractions(sink);
}
@Test
public void closeWithoutBufferedFrameGivesEmptySink() {
framer.close();
verify(sink).deliverFrame(new ByteWritableBuffer(0), true);
verify(sink).deliverFrame(new ByteWritableBuffer(0), true, true);
verifyNoMoreInteractions(sink);
}
@ -115,11 +115,11 @@ public class MessageFramerTest {
public void payloadSplitBetweenSinks() {
writePayload(framer, new byte[] {3, 14, 1, 5, 9, 2, 6, 5});
verify(sink).deliverFrame(
toWriteBuffer(new byte[] {0, 0, 0, 0, 8, 3, 14, 1, 5, 9, 2, 6}), false);
toWriteBuffer(new byte[] {0, 0, 0, 0, 8, 3, 14, 1, 5, 9, 2, 6}), false, false);
verifyNoMoreInteractions(sink);
framer.flush();
verify(sink).deliverFrame(toWriteBuffer(new byte[] {5}), false);
verify(sink).deliverFrame(toWriteBuffer(new byte[] {5}), false, true);
verifyNoMoreInteractions(sink);
}
@ -128,11 +128,11 @@ public class MessageFramerTest {
writePayload(framer, new byte[] {3, 14, 1});
writePayload(framer, new byte[] {3});
verify(sink).deliverFrame(
toWriteBuffer(new byte[] {0, 0, 0, 0, 3, 3, 14, 1, 0, 0, 0, 0}), false);
toWriteBuffer(new byte[] {0, 0, 0, 0, 3, 3, 14, 1, 0, 0, 0, 0}), false, false);
verifyNoMoreInteractions(sink);
framer.flush();
verify(sink).deliverFrame(toWriteBuffer(new byte[] {1, 3}), false);
verify(sink).deliverFrame(toWriteBuffer(new byte[] {1, 3}), false, true);
verifyNoMoreInteractions(sink);
}
@ -140,7 +140,7 @@ public class MessageFramerTest {
public void emptyPayloadYieldsFrame() throws Exception {
writePayload(framer, new byte[0]);
framer.flush();
verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false);
verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false, true);
}
@Test
@ -148,7 +148,7 @@ public class MessageFramerTest {
writePayload(framer, new byte[] {3, 14});
framer.flush();
framer.flush();
verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false);
verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false, true);
verifyNoMoreInteractions(sink);
}
@ -158,7 +158,7 @@ public class MessageFramerTest {
MessageFramer framer = new MessageFramer(sink, allocator, transportFrameSize);
writePayload(framer, new byte[1000]);
framer.flush();
verify(sink).deliverFrame(frameCaptor.capture(), eq(false));
verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true));
ByteWritableBuffer buffer = frameCaptor.getValue();
assertEquals(1005, buffer.size());
@ -177,7 +177,7 @@ public class MessageFramerTest {
new MessageFramer(sink, allocator, transportFrameSize, Compression.GZIP);
writePayload(framer, new byte[1000]);
framer.flush();
verify(sink).deliverFrame(frameCaptor.capture(), eq(false));
verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true));
ByteWritableBuffer buffer = frameCaptor.getValue();
// It should have compressed very well.
assertTrue(buffer.size() < 100);

View File

@ -115,9 +115,12 @@ class NettyClientStream extends Http2ClientStream {
}
@Override
protected void sendFrame(WritableBuffer frame, boolean endOfStream) {
protected void sendFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {
ByteBuf bytebuf = ((NettyWritableBuffer) frame).bytebuf();
channel.writeAndFlush(new SendGrpcFrameCommand(this, bytebuf, endOfStream));
channel.write(new SendGrpcFrameCommand(this, bytebuf, endOfStream));
if (flush) {
channel.flush();
}
}
@Override

View File

@ -88,9 +88,12 @@ class NettyServerStream extends AbstractServerStream<Integer> {
}
@Override
protected void sendFrame(WritableBuffer frame, boolean endOfStream) {
protected void sendFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {
ByteBuf bytebuf = ((NettyWritableBuffer) frame).bytebuf();
channel.writeAndFlush(new SendGrpcFrameCommand(this, bytebuf, endOfStream));
channel.write(new SendGrpcFrameCommand(this, bytebuf, endOfStream));
if (flush) {
channel.flush();
}
}
@Override

View File

@ -108,7 +108,8 @@ public class NettyClientStreamTest extends NettyStreamTestBase {
stream().id(STREAM_ID);
stream.writeMessage(input, input.available(), accepted);
stream.flush();
verify(channel).writeAndFlush(new SendGrpcFrameCommand(stream, messageFrame(MESSAGE), false));
verify(channel).write(new SendGrpcFrameCommand(stream, messageFrame(MESSAGE), false));
verify(channel).flush();
verify(accepted).run();
}

View File

@ -314,7 +314,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase {
final ByteBuf compressionFrame = Unpooled.buffer(CONTENT.length);
MessageFramer framer = new MessageFramer(new MessageFramer.Sink() {
@Override
public void deliverFrame(WritableBuffer frame, boolean endOfStream) {
public void deliverFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {
ByteBuf bytebuf = ((NettyWritableBuffer)frame).bytebuf();
compressionFrame.writeBytes(bytebuf);
}

View File

@ -85,7 +85,8 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
.status(Utils.STATUS_OK)
.set(Utils.CONTENT_TYPE_HEADER, Utils.CONTENT_TYPE_GRPC);
verify(channel).writeAndFlush(new SendResponseHeadersCommand(STREAM_ID, headers, false));
verify(channel).writeAndFlush(new SendGrpcFrameCommand(stream, messageFrame(MESSAGE), false));
verify(channel).write(new SendGrpcFrameCommand(stream, messageFrame(MESSAGE), false));
verify(channel).flush();
verify(accepted).run();
}

View File

@ -143,14 +143,14 @@ class OkHttpClientStream extends Http2ClientStream {
}
@Override
protected void sendFrame(WritableBuffer frame, boolean endOfStream) {
protected void sendFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {
checkState(id() != 0, "streamId should be set");
Buffer buffer = ((OkHttpWritableBuffer) frame).buffer();
// Write the data to the remote endpoint.
// Per http2 SPEC, the max data length should be larger than 64K, while our frame size is
// only 4K.
checkState(buffer.size() < frameWriter.maxDataLength());
outboundFlow.data(endOfStream, id(), buffer);
outboundFlow.data(endOfStream, id(), buffer, flush);
}
@Override

View File

@ -101,7 +101,7 @@ class OutboundFlowController {
}
}
synchronized void data(boolean outFinished, int streamId, Buffer source) {
synchronized void data(boolean outFinished, int streamId, Buffer source, boolean flush) {
Preconditions.checkNotNull(source, "source");
if (streamId <= 0) {
throw new IllegalArgumentException("streamId must be > 0");
@ -115,7 +115,9 @@ class OutboundFlowController {
if (!framesAlreadyQueued && window >= frame.size()) {
// Window size is large enough to send entire data frame
frame.write();
if (flush) {
flush();
}
return;
}
@ -124,13 +126,18 @@ class OutboundFlowController {
if (framesAlreadyQueued || window <= 0) {
// Stream already has frames pending or is stalled, don't send anything now.
if (flush) {
flush();
}
return;
}
// Create and send a partial frame up to the window size.
frame.split(window).write();
if (flush) {
flush();
}
}
private void flush() {
try {