core,netty,okhttp: move transport tracer outbound counters to transport thread (#3661)

Counters are bumped when a message is completely written. If a
part of a message is still buffered and not yet flushed, we will
not increment the stats.
This commit is contained in:
zpencer 2017-11-06 12:46:17 -08:00 committed by GitHub
parent 1bc7d76d3a
commit 9fac15d4f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 117 additions and 108 deletions

View File

@ -64,8 +64,10 @@ public abstract class AbstractClientStream extends AbstractStream
* @param endOfStream {@code true} if this is the last frame; {@code flush} is guaranteed to be
* {@code true} if this is {@code true}
* @param flush {@code true} if more data may not be arriving soon
* @Param numMessages the number of messages this series of frames represents
*/
void writeFrame(@Nullable WritableBuffer frame, boolean endOfStream, boolean flush);
void writeFrame(
@Nullable WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages);
/**
* Requests up to the given number of messages from the call to be delivered to the client. This
@ -100,8 +102,7 @@ public abstract class AbstractClientStream extends AbstractStream
Preconditions.checkNotNull(headers, "headers");
this.useGet = useGet;
if (!useGet) {
TransportTracer transportTracer = null; // TODO(zpencer): add tracing on clients
framer = new MessageFramer(this, bufferAllocator, statsTraceCtx, transportTracer);
framer = new MessageFramer(this, bufferAllocator, statsTraceCtx);
this.headers = headers;
} else {
framer = new GetFramer(headers, statsTraceCtx);
@ -158,9 +159,10 @@ public abstract class AbstractClientStream extends AbstractStream
}
@Override
public final void deliverFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {
public final void deliverFrame(
WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
Preconditions.checkArgument(frame != null || endOfStream, "null frame before EOS");
abstractClientStreamSink().writeFrame(frame, endOfStream, flush);
abstractClientStreamSink().writeFrame(frame, endOfStream, flush, numMessages);
}
@Override

View File

@ -48,8 +48,9 @@ public abstract class AbstractServerStream extends AbstractStream
*
* @param frame a buffer containing the chunk of data to be sent.
* @param flush {@code true} if more data may not be arriving soon
* @param numMessages the number of messages this frame represents
*/
void writeFrame(@Nullable WritableBuffer frame, boolean flush);
void writeFrame(@Nullable WritableBuffer frame, boolean flush, int numMessages);
/**
* Sends trailers to the remote end point. This call implies end of stream.
@ -77,17 +78,13 @@ public abstract class AbstractServerStream extends AbstractStream
private final MessageFramer framer;
private final StatsTraceContext statsTraceCtx;
private final TransportTracer transportTracer;
private boolean outboundClosed;
private boolean headersSent;
protected AbstractServerStream(
WritableBufferAllocator bufferAllocator,
StatsTraceContext statsTraceCtx,
TransportTracer transportTracer) {
WritableBufferAllocator bufferAllocator, StatsTraceContext statsTraceCtx) {
this.statsTraceCtx = Preconditions.checkNotNull(statsTraceCtx, "statsTraceCtx");
this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
framer = new MessageFramer(this, bufferAllocator, statsTraceCtx, transportTracer);
framer = new MessageFramer(this, bufferAllocator, statsTraceCtx);
}
@Override
@ -118,10 +115,11 @@ public abstract class AbstractServerStream extends AbstractStream
}
@Override
public final void deliverFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {
public final void deliverFrame(
WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
// Since endOfStream is triggered by the sending of trailers, avoid flush here and just flush
// after the trailers.
abstractServerStreamSink().writeFrame(frame, endOfStream ? false : flush);
abstractServerStreamSink().writeFrame(frame, endOfStream ? false : flush, numMessages);
}
@Override

View File

@ -55,8 +55,13 @@ public class MessageFramer implements Framer {
* closed and there is no data to deliver.
* @param endOfStream whether the frame is the last one for the GRPC stream
* @param flush {@code true} if more data may not be arriving soon
* @param numMessages the number of messages that this series of frames represents
*/
void deliverFrame(@Nullable WritableBuffer frame, boolean endOfStream, boolean flush);
void deliverFrame(
@Nullable WritableBuffer frame,
boolean endOfStream,
boolean flush,
int numMessages);
}
private static final int HEADER_LENGTH = 5;
@ -74,11 +79,10 @@ public class MessageFramer implements Framer {
private final WritableBufferAllocator bufferAllocator;
private final StatsTraceContext statsTraceCtx;
// transportTracer is nullable until it is integrated with client transports
@Nullable
private final TransportTracer transportTracer;
private boolean closed;
// Tracing and stats-related states
private int messagesBuffered;
private int currentMessageSeqNo = -1;
private long currentMessageWireSize;
@ -89,14 +93,10 @@ public class MessageFramer implements Framer {
* @param bufferAllocator allocates buffers that the transport can commit to the wire.
*/
public MessageFramer(
Sink sink,
WritableBufferAllocator bufferAllocator,
StatsTraceContext statsTraceCtx,
@Nullable TransportTracer transportTracer) {
Sink sink, WritableBufferAllocator bufferAllocator, StatsTraceContext statsTraceCtx) {
this.sink = checkNotNull(sink, "sink");
this.bufferAllocator = checkNotNull(bufferAllocator, "bufferAllocator");
this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
this.transportTracer = transportTracer;
}
@Override
@ -125,6 +125,7 @@ public class MessageFramer implements Framer {
@Override
public void writePayload(InputStream message) {
verifyNotClosed();
messagesBuffered++;
currentMessageSeqNo++;
currentMessageWireSize = 0;
statsTraceCtx.outboundMessage(currentMessageSeqNo);
@ -158,9 +159,6 @@ public class MessageFramer implements Framer {
statsTraceCtx.outboundUncompressedSize(written);
statsTraceCtx.outboundWireSize(currentMessageWireSize);
statsTraceCtx.outboundMessageSent(currentMessageSeqNo, currentMessageWireSize, written);
if (transportTracer != null) {
transportTracer.reportMessageSent();
}
}
private int writeUncompressed(InputStream message, int messageLength) throws IOException {
@ -249,11 +247,14 @@ public class MessageFramer implements Framer {
// Note that we are always delivering a small message to the transport here which
// may incur transport framing overhead as it may be sent separately to the contents
// of the GRPC frame.
sink.deliverFrame(writeableHeader, false, false);
// The final message may not be completely written because we do not flush the last buffer.
// Do not report the last message as sent.
sink.deliverFrame(writeableHeader, false, false, messagesBuffered - 1);
messagesBuffered = 1;
// Commit all except the last buffer to the sink
List<WritableBuffer> bufferList = bufferChain.bufferList;
for (int i = 0; i < bufferList.size() - 1; i++) {
sink.deliverFrame(bufferList.get(i), false, false);
sink.deliverFrame(bufferList.get(i), false, false, 0);
}
// Assign the current buffer to the last in the chain so it can be used
// for future writes or written with end-of-stream=true on close.
@ -346,7 +347,8 @@ public class MessageFramer implements Framer {
private void commitToSink(boolean endOfStream, boolean flush) {
WritableBuffer buf = buffer;
buffer = null;
sink.deliverFrame(buf, endOfStream, flush);
sink.deliverFrame(buf, endOfStream, flush, messagesBuffered);
messagesBuffered = 0;
}
private void verifyNotClosed() {

View File

@ -32,9 +32,8 @@ public final class TransportTracer {
private long keepAlivesSent;
private FlowControlReader flowControlWindowReader;
// TODO(zpencer): msg sent can piggyback on framer's writequeue and avoid syncing the update
private final LongCounter messagesSent = LongCounterFactory.create();
private volatile long lastMessageSentTimeNanos;
private long messagesSent;
private long lastMessageSentTimeNanos;
// deframing happens on the application thread, and there's no easy way to avoid synchronization
private final LongCounter messagesReceived = LongCounterFactory.create();
private volatile long lastMessageReceivedTimeNanos;
@ -48,7 +47,7 @@ public final class TransportTracer {
lastStreamCreatedTimeNanos,
streamsSucceeded,
streamsFailed,
messagesSent.value(),
messagesSent,
messagesReceived.value(),
keepAlivesSent,
lastMessageSentTimeNanos,
@ -80,7 +79,7 @@ public final class TransportTracer {
* Reports that a message was successfully sent. This method is thread safe.
*/
public void reportMessageSent() {
messagesSent.add(1);
messagesSent++;
lastMessageSentTimeNanos = currentTimeNanos();
}

View File

@ -326,7 +326,8 @@ public class AbstractClientStreamTest {
assertTrue(payloadCaptor.getValue() != null);
// GET requests don't have BODY.
verify(sink, never())
.writeFrame(any(WritableBuffer.class), any(Boolean.class), any(Boolean.class));
.writeFrame(
any(WritableBuffer.class), any(Boolean.class), any(Boolean.class), any(Integer.class));
assertThat(tracer.nextOutboundEvent()).isEqualTo("outboundMessage(0)");
assertThat(tracer.nextOutboundEvent()).isEqualTo("outboundMessage()");
assertThat(tracer.nextOutboundEvent()).matches("outboundMessageSent\\(0, [0-9]+, [0-9]+\\)");
@ -393,7 +394,8 @@ public class AbstractClientStreamTest {
public void request(int numMessages) {}
@Override
public void writeFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {}
public void writeFrame(
WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {}
@Override
public void cancel(Status reason) {}

View File

@ -74,8 +74,7 @@ public class AbstractServerStreamTest {
stream = new AbstractServerStreamBase(
allocator,
sink,
new AbstractServerStreamBase.TransportState(MAX_MESSAGE_SIZE, transportTracer),
transportTracer);
new AbstractServerStreamBase.TransportState(MAX_MESSAGE_SIZE, transportTracer));
}
/**
@ -267,7 +266,8 @@ public class AbstractServerStreamTest {
stream.writeMessage(new ByteArrayInputStream(new byte[]{}));
verify(sink, never()).writeFrame(any(WritableBuffer.class), any(Boolean.class));
verify(sink, never())
.writeFrame(any(WritableBuffer.class), any(Boolean.class), any(Integer.class));
}
@Test
@ -277,7 +277,7 @@ public class AbstractServerStreamTest {
stream.writeMessage(new ByteArrayInputStream(new byte[]{}));
stream.flush();
verify(sink).writeFrame(any(WritableBuffer.class), eq(true));
verify(sink).writeFrame(any(WritableBuffer.class), eq(true), eq(1));
}
@Test
@ -346,8 +346,8 @@ public class AbstractServerStreamTest {
private final AbstractServerStream.TransportState state;
protected AbstractServerStreamBase(WritableBufferAllocator bufferAllocator, Sink sink,
AbstractServerStream.TransportState state, TransportTracer transportTracer) {
super(bufferAllocator, StatsTraceContext.NOOP, transportTracer);
AbstractServerStream.TransportState state) {
super(bufferAllocator, StatsTraceContext.NOOP);
this.sink = sink;
this.state = state;
}

View File

@ -16,7 +16,6 @@
package io.grpc.internal;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@ -35,7 +34,6 @@ import java.io.ByteArrayInputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -65,7 +63,6 @@ public class MessageFramerTest {
private BytesWritableBufferAllocator allocator =
new BytesWritableBufferAllocator(1000, 1000);
private StatsTraceContext statsTraceCtx;
private TransportTracer transportTracer;
/** Set up for test. */
@Before
@ -74,8 +71,7 @@ public class MessageFramerTest {
// MessageDeframerTest tests with a client-side StatsTraceContext, so here we test with a
// server-side StatsTraceContext.
statsTraceCtx = new StatsTraceContext(new StreamTracer[]{tracer});
transportTracer = new TransportTracer();
framer = new MessageFramer(sink, allocator, statsTraceCtx, transportTracer);
framer = new MessageFramer(sink, allocator, statsTraceCtx);
}
@Test
@ -84,7 +80,7 @@ public class MessageFramerTest {
verifyNoMoreInteractions(sink);
framer.flush();
verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false, true);
verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false, true, 1);
assertEquals(1, allocator.allocCount);
verifyNoMoreInteractions(sink);
checkStats(2, 2);
@ -95,8 +91,8 @@ public class MessageFramerTest {
writeUnknownLength(framer, new byte[]{3, 14});
framer.flush();
// Header is written first, then payload
verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2}), false, false);
verify(sink).deliverFrame(toWriteBuffer(new byte[] {3, 14}), false, true);
verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2}), false, false, 0);
verify(sink).deliverFrame(toWriteBuffer(new byte[] {3, 14}), false, true, 1);
assertEquals(2, allocator.allocCount);
verifyNoMoreInteractions(sink);
checkStats(2, 2);
@ -110,7 +106,7 @@ public class MessageFramerTest {
verifyNoMoreInteractions(sink);
framer.flush();
verify(sink).deliverFrame(
toWriteBuffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 1, 14}), false, true);
toWriteBuffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 1, 14}), false, true, 2);
verifyNoMoreInteractions(sink);
assertEquals(1, allocator.allocCount);
checkStats(1, 1, 1, 1);
@ -122,7 +118,7 @@ public class MessageFramerTest {
verifyNoMoreInteractions(sink);
framer.close();
verify(sink).deliverFrame(
toWriteBuffer(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9, 2, 6}), true, true);
toWriteBuffer(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9, 2, 6}), true, true, 1);
verifyNoMoreInteractions(sink);
assertEquals(1, allocator.allocCount);
checkStats(7, 7);
@ -131,7 +127,7 @@ public class MessageFramerTest {
@Test
public void closeWithoutBufferedFrameGivesNullBuffer() {
framer.close();
verify(sink).deliverFrame(null, true, true);
verify(sink).deliverFrame(null, true, true, 0);
verifyNoMoreInteractions(sink);
assertEquals(0, allocator.allocCount);
checkStats();
@ -140,14 +136,14 @@ public class MessageFramerTest {
@Test
public void payloadSplitBetweenSinks() {
allocator = new BytesWritableBufferAllocator(12, 12);
framer = new MessageFramer(sink, allocator, statsTraceCtx, transportTracer);
framer = new MessageFramer(sink, allocator, statsTraceCtx);
writeKnownLength(framer, new byte[]{3, 14, 1, 5, 9, 2, 6, 5});
verify(sink).deliverFrame(
toWriteBuffer(new byte[] {0, 0, 0, 0, 8, 3, 14, 1, 5, 9, 2, 6}), false, false);
toWriteBuffer(new byte[] {0, 0, 0, 0, 8, 3, 14, 1, 5, 9, 2, 6}), false, false, 1);
verifyNoMoreInteractions(sink);
framer.flush();
verify(sink).deliverFrame(toWriteBuffer(new byte[] {5}), false, true);
verify(sink).deliverFrame(toWriteBuffer(new byte[] {5}), false, true, 0);
verifyNoMoreInteractions(sink);
assertEquals(2, allocator.allocCount);
checkStats(8, 8);
@ -156,15 +152,15 @@ public class MessageFramerTest {
@Test
public void frameHeaderSplitBetweenSinks() {
allocator = new BytesWritableBufferAllocator(12, 12);
framer = new MessageFramer(sink, allocator, statsTraceCtx, transportTracer);
framer = new MessageFramer(sink, allocator, statsTraceCtx);
writeKnownLength(framer, new byte[]{3, 14, 1});
writeKnownLength(framer, new byte[]{3});
verify(sink).deliverFrame(
toWriteBuffer(new byte[] {0, 0, 0, 0, 3, 3, 14, 1, 0, 0, 0, 0}), false, false);
toWriteBuffer(new byte[] {0, 0, 0, 0, 3, 3, 14, 1, 0, 0, 0, 0}), false, false, 2);
verifyNoMoreInteractions(sink);
framer.flush();
verify(sink).deliverFrame(toWriteBufferWithMinSize(new byte[] {1, 3}, 12), false, true);
verify(sink).deliverFrame(toWriteBufferWithMinSize(new byte[] {1, 3}, 12), false, true, 0);
verifyNoMoreInteractions(sink);
assertEquals(2, allocator.allocCount);
checkStats(3, 3, 1, 1);
@ -174,7 +170,7 @@ public class MessageFramerTest {
public void emptyPayloadYieldsFrame() throws Exception {
writeKnownLength(framer, new byte[0]);
framer.flush();
verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false, true);
verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false, true, 1);
assertEquals(1, allocator.allocCount);
checkStats(0, 0);
}
@ -184,7 +180,7 @@ public class MessageFramerTest {
writeUnknownLength(framer, new byte[0]);
verifyZeroInteractions(sink);
framer.flush();
verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false, true);
verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false, true, 1);
// One alloc for the header
assertEquals(1, allocator.allocCount);
checkStats(0, 0);
@ -195,7 +191,7 @@ public class MessageFramerTest {
writeKnownLength(framer, new byte[]{3, 14});
framer.flush();
framer.flush();
verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false, true);
verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false, true, 1);
verifyNoMoreInteractions(sink);
assertEquals(1, allocator.allocCount);
checkStats(2, 2);
@ -204,10 +200,10 @@ public class MessageFramerTest {
@Test
public void largerFrameSize() throws Exception {
allocator = new BytesWritableBufferAllocator(0, 10000);
framer = new MessageFramer(sink, allocator, statsTraceCtx, transportTracer);
framer = new MessageFramer(sink, allocator, statsTraceCtx);
writeKnownLength(framer, new byte[1000]);
framer.flush();
verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true));
verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true), eq(1));
ByteWritableBuffer buffer = frameCaptor.getValue();
assertEquals(1005, buffer.size());
@ -225,13 +221,14 @@ public class MessageFramerTest {
public void largerFrameSizeUnknownLength() throws Exception {
// Force payload to be split into two chunks
allocator = new BytesWritableBufferAllocator(500, 500);
framer = new MessageFramer(sink, allocator, statsTraceCtx, transportTracer);
framer = new MessageFramer(sink, allocator, statsTraceCtx);
writeUnknownLength(framer, new byte[1000]);
framer.flush();
// Header and first chunk written with flush = false
verify(sink, times(2)).deliverFrame(frameCaptor.capture(), eq(false), eq(false));
verify(sink, times(2)).deliverFrame(frameCaptor.capture(), eq(false), eq(false), eq(0));
// On flush third buffer written with flish = true
verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true));
// The message count is only bumped when a message is completely written.
verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true), eq(1));
// header has fixed length of 5 and specifies correct length
assertEquals(5, frameCaptor.getAllValues().get(0).readableBytes());
@ -252,13 +249,14 @@ public class MessageFramerTest {
public void compressed() throws Exception {
allocator = new BytesWritableBufferAllocator(100, Integer.MAX_VALUE);
// setMessageCompression should default to true
framer = new MessageFramer(sink, allocator, statsTraceCtx, transportTracer)
framer = new MessageFramer(sink, allocator, statsTraceCtx)
.setCompressor(new Codec.Gzip());
writeKnownLength(framer, new byte[1000]);
framer.flush();
// The GRPC header is written first as a separate frame.
verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(false));
verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true));
// The message count is only bumped when a message is completely written.
verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(false), eq(0));
verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true), eq(1));
// Check the header
ByteWritableBuffer buffer = frameCaptor.getAllValues().get(0);
@ -277,12 +275,12 @@ public class MessageFramerTest {
@Test
public void dontCompressIfNoEncoding() throws Exception {
allocator = new BytesWritableBufferAllocator(100, Integer.MAX_VALUE);
framer = new MessageFramer(sink, allocator, statsTraceCtx, transportTracer)
framer = new MessageFramer(sink, allocator, statsTraceCtx)
.setMessageCompression(true);
writeKnownLength(framer, new byte[1000]);
framer.flush();
// The GRPC header is written first as a separate frame
verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true));
verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true), eq(1));
// Check the header
ByteWritableBuffer buffer = frameCaptor.getAllValues().get(0);
@ -302,13 +300,13 @@ public class MessageFramerTest {
@Test
public void dontCompressIfNotRequested() throws Exception {
allocator = new BytesWritableBufferAllocator(100, Integer.MAX_VALUE);
framer = new MessageFramer(sink, allocator, statsTraceCtx, transportTracer)
framer = new MessageFramer(sink, allocator, statsTraceCtx)
.setCompressor(new Codec.Gzip())
.setMessageCompression(false);
writeKnownLength(framer, new byte[1000]);
framer.flush();
// The GRPC header is written first as a separate frame
verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true));
verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true), eq(1));
// Check the header
ByteWritableBuffer buffer = frameCaptor.getAllValues().get(0);
@ -330,7 +328,8 @@ public class MessageFramerTest {
MessageFramer.Sink reentrant = new MessageFramer.Sink() {
int count = 0;
@Override
public void deliverFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {
public void deliverFrame(
WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
if (count == 0) {
framer.close();
count++;
@ -339,7 +338,7 @@ public class MessageFramerTest {
}
}
};
framer = new MessageFramer(reentrant, allocator, statsTraceCtx, transportTracer);
framer = new MessageFramer(reentrant, allocator, statsTraceCtx);
writeKnownLength(framer, new byte[]{3, 14});
framer.close();
}
@ -350,7 +349,7 @@ public class MessageFramerTest {
framer.setMessageCompression(true);
writeKnownLength(framer, new byte[]{});
framer.flush();
verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false, true);
verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false, true, 1);
checkStats(0, 0);
}
@ -394,16 +393,6 @@ public class MessageFramerTest {
assertNull(tracer.nextInboundEvent());
assertEquals(expectedWireSize, tracer.getOutboundWireSize());
assertEquals(expectedUncompressedSize, tracer.getOutboundUncompressedSize());
TransportTracer.Stats transportStats = transportTracer.getStats();
assertEquals(count, transportStats.messagesSent);
long transportSentMsgMs = TimeUnit.NANOSECONDS.toMillis(
transportStats.lastMessageSentTimeNanos);
if (count > 0) {
assertThat(System.currentTimeMillis() - transportSentMsgMs).isAtMost(50L);
} else {
assertEquals(0, transportSentMsgMs);
}
}
static class ByteWritableBuffer implements WritableBuffer {

View File

@ -146,14 +146,15 @@ class NettyClientStream extends AbstractClientStream {
}
@Override
public void writeFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {
public void writeFrame(
WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
ByteBuf bytebuf = frame == null ? EMPTY_BUFFER : ((NettyWritableBuffer) frame).bytebuf();
final int numBytes = bytebuf.readableBytes();
if (numBytes > 0) {
// Add the bytes to outbound flow control.
onSendingBytes(numBytes);
writeQueue.enqueue(
new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream),
new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream, numMessages),
channel.newPromise().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
@ -168,7 +169,9 @@ class NettyClientStream extends AbstractClientStream {
}), flush);
} else {
// The frame is empty and will not impact outbound flow control. Just send it.
writeQueue.enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush);
writeQueue.enqueue(
new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream, numMessages),
flush);
}
}

View File

@ -48,6 +48,7 @@ class NettyServerStream extends AbstractServerStream {
private final WriteQueue writeQueue;
private final Attributes attributes;
private final String authority;
private final TransportTracer transportTracer;
public NettyServerStream(
Channel channel,
@ -56,12 +57,13 @@ class NettyServerStream extends AbstractServerStream {
String authority,
StatsTraceContext statsTraceCtx,
TransportTracer transportTracer) {
super(new NettyWritableBufferAllocator(channel.alloc()), statsTraceCtx, transportTracer);
super(new NettyWritableBufferAllocator(channel.alloc()), statsTraceCtx);
this.state = checkNotNull(state, "transportState");
this.channel = checkNotNull(channel, "channel");
this.writeQueue = state.handler.getWriteQueue();
this.attributes = checkNotNull(transportAttrs);
this.authority = authority;
this.transportTracer = checkNotNull(transportTracer, "transportTracer");
}
@Override
@ -110,7 +112,7 @@ class NettyServerStream extends AbstractServerStream {
}
@Override
public void writeFrame(WritableBuffer frame, boolean flush) {
public void writeFrame(WritableBuffer frame, boolean flush, int numMessages) {
if (frame == null) {
writeQueue.scheduleFlush();
return;
@ -120,13 +122,16 @@ class NettyServerStream extends AbstractServerStream {
// Add the bytes to outbound flow control.
onSendingBytes(numBytes);
writeQueue.enqueue(
new SendGrpcFrameCommand(transportState(), bytebuf, false),
new SendGrpcFrameCommand(transportState(), bytebuf, false, numMessages),
channel.newPromise().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// Remove the bytes from outbound flow control, optionally notifying
// the client that they can send more bytes.
transportState().onSentBytes(numBytes);
if (future.isSuccess()) {
transportTracer.reportMessageSent();
}
}
}), flush);
}

View File

@ -28,13 +28,16 @@ import io.netty.channel.ChannelPromise;
class SendGrpcFrameCommand extends DefaultByteBufHolder implements WriteQueue.QueuedCommand {
private final StreamIdHolder stream;
private final boolean endStream;
private final int numMessages;
private ChannelPromise promise;
SendGrpcFrameCommand(StreamIdHolder stream, ByteBuf content, boolean endStream) {
SendGrpcFrameCommand(
StreamIdHolder stream, ByteBuf content, boolean endStream, int numMessages) {
super(content);
this.stream = stream;
this.endStream = endStream;
this.numMessages = numMessages;
}
int streamId() {
@ -47,12 +50,12 @@ class SendGrpcFrameCommand extends DefaultByteBufHolder implements WriteQueue.Qu
@Override
public ByteBufHolder copy() {
return new SendGrpcFrameCommand(stream, content().copy(), endStream);
return new SendGrpcFrameCommand(stream, content().copy(), endStream, numMessages);
}
@Override
public ByteBufHolder duplicate() {
return new SendGrpcFrameCommand(stream, content().duplicate(), endStream);
return new SendGrpcFrameCommand(stream, content().duplicate(), endStream, numMessages);
}
@Override
@ -86,13 +89,14 @@ class SendGrpcFrameCommand extends DefaultByteBufHolder implements WriteQueue.Qu
}
SendGrpcFrameCommand thatCmd = (SendGrpcFrameCommand) that;
return thatCmd.stream.equals(stream) && thatCmd.endStream == endStream
&& thatCmd.content().equals(content());
&& thatCmd.content().equals(content()) && thatCmd.numMessages == numMessages;
}
@Override
public String toString() {
return getClass().getSimpleName() + "(streamId=" + streamId()
+ ", endStream=" + endStream + ", content=" + content()
+ ", numMessages=" + numMessages
+ ")";
}
@ -103,6 +107,7 @@ class SendGrpcFrameCommand extends DefaultByteBufHolder implements WriteQueue.Qu
if (endStream) {
hash = -hash;
}
hash = hash * 31 + numMessages;
return hash;
}

View File

@ -267,7 +267,8 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
createStream();
// Send a frame and verify that it was written.
ChannelFuture future = enqueue(new SendGrpcFrameCommand(streamTransportState, content(), true));
ChannelFuture future
= enqueue(new SendGrpcFrameCommand(streamTransportState, content(), true, 1));
assertTrue(future.isSuccess());
verifyWrite().writeData(eq(ctx()), eq(3), eq(content()), eq(0), eq(true),
@ -278,7 +279,8 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
@Test
public void sendForUnknownStreamShouldFail() throws Exception {
ChannelFuture future = enqueue(new SendGrpcFrameCommand(streamTransportState, content(), true));
ChannelFuture future
= enqueue(new SendGrpcFrameCommand(streamTransportState, content(), true, 1));
assertTrue(future.isDone());
assertFalse(future.isSuccess());
}

View File

@ -176,7 +176,7 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
stream.writeMessage(new ByteArrayInputStream(msg));
stream.flush();
verify(writeQueue).enqueue(
eq(new SendGrpcFrameCommand(stream.transportState(), messageFrame(MESSAGE), false)),
eq(new SendGrpcFrameCommand(stream.transportState(), messageFrame(MESSAGE), false, 1)),
any(ChannelPromise.class),
eq(true));
}
@ -189,14 +189,15 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
stream.writeMessage(new BufferedInputStream(new ByteArrayInputStream(msg)));
stream.flush();
// Two writes occur, one for the GRPC frame header and the second with the payload
// The framer reports the message count when the payload is completely written
verify(writeQueue).enqueue(
eq(new SendGrpcFrameCommand(
stream.transportState(), messageFrame(MESSAGE).slice(0, 5), false)),
stream.transportState(), messageFrame(MESSAGE).slice(0, 5), false, 0)),
any(ChannelPromise.class),
eq(false));
verify(writeQueue).enqueue(
eq(new SendGrpcFrameCommand(
stream.transportState(), messageFrame(MESSAGE).slice(5, 11), false)),
stream.transportState(), messageFrame(MESSAGE).slice(5, 11), false, 1)),
any(ChannelPromise.class),
eq(true));
}

View File

@ -244,7 +244,8 @@ public abstract class NettyHandlerTestBase<T extends Http2ConnectionHandler> {
MessageFramer framer = new MessageFramer(
new MessageFramer.Sink() {
@Override
public void deliverFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {
public void deliverFrame(
WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
if (frame != null) {
ByteBuf bytebuf = ((NettyWritableBuffer) frame).bytebuf();
compressionFrame.writeBytes(bytebuf);
@ -252,8 +253,7 @@ public abstract class NettyHandlerTestBase<T extends Http2ConnectionHandler> {
}
},
new NettyWritableBufferAllocator(ByteBufAllocator.DEFAULT),
StatsTraceContext.NOOP,
noTransportTracer);
StatsTraceContext.NOOP);
framer.writePayload(new ByteArrayInputStream(content));
framer.flush();
ChannelHandlerContext ctx = newMockContext();

View File

@ -202,7 +202,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
// Send a frame and verify that it was written.
ChannelFuture future = enqueue(
new SendGrpcFrameCommand(stream.transportState(), content(), false));
new SendGrpcFrameCommand(stream.transportState(), content(), false, 1));
assertTrue(future.isSuccess());
verifyWrite().writeData(eq(ctx()), eq(STREAM_ID), eq(content()), eq(0), eq(false),
any(ChannelPromise.class));
@ -559,7 +559,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
payload.writeLong(1);
for (int i = 0; i < 10; i++) {
future = enqueue(
new SendGrpcFrameCommand(stream.transportState(), content().retainedSlice(), false));
new SendGrpcFrameCommand(stream.transportState(), content().retainedSlice(), false, 1));
future.get();
channel().releaseOutbound();
channelRead(pingFrame(false /* isAck */, payload.slice()));

View File

@ -124,7 +124,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase<NettyServerStream
stream.flush();
verify(writeQueue).enqueue(
eq(new SendGrpcFrameCommand(stream.transportState(), messageFrame(MESSAGE), false)),
eq(new SendGrpcFrameCommand(stream.transportState(), messageFrame(MESSAGE), false, 1)),
isA(ChannelPromise.class),
eq(true));
}

View File

@ -132,7 +132,8 @@ class OkHttpClientStream extends AbstractClientStream {
}
@Override
public void writeFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {
public void writeFrame(
WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
Buffer buffer;
if (frame == null) {
buffer = EMPTY_BUFFER;