From 9fac15d4f8fb09106fc4a80d761afbe1594b4d0c Mon Sep 17 00:00:00 2001 From: zpencer Date: Mon, 6 Nov 2017 12:46:17 -0800 Subject: [PATCH] core,netty,okhttp: move transport tracer outbound counters to transport thread (#3661) Counters are bumped when a message is completely written. If a part of a message is still buffered and not yet flushed, we will not increment the stats. --- .../grpc/internal/AbstractClientStream.java | 12 +-- .../grpc/internal/AbstractServerStream.java | 16 ++-- .../java/io/grpc/internal/MessageFramer.java | 30 +++---- .../io/grpc/internal/TransportTracer.java | 9 +-- .../internal/AbstractClientStreamTest.java | 6 +- .../internal/AbstractServerStreamTest.java | 12 +-- .../io/grpc/internal/MessageFramerTest.java | 79 ++++++++----------- .../java/io/grpc/netty/NettyClientStream.java | 9 ++- .../java/io/grpc/netty/NettyServerStream.java | 11 ++- .../io/grpc/netty/SendGrpcFrameCommand.java | 13 ++- .../io/grpc/netty/NettyClientHandlerTest.java | 6 +- .../io/grpc/netty/NettyClientStreamTest.java | 7 +- .../io/grpc/netty/NettyHandlerTestBase.java | 6 +- .../io/grpc/netty/NettyServerHandlerTest.java | 4 +- .../io/grpc/netty/NettyServerStreamTest.java | 2 +- .../io/grpc/okhttp/OkHttpClientStream.java | 3 +- 16 files changed, 117 insertions(+), 108 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java index 72532627f3..26946b7f57 100644 --- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractClientStream.java @@ -64,8 +64,10 @@ public abstract class AbstractClientStream extends AbstractStream * @param endOfStream {@code true} if this is the last frame; {@code flush} is guaranteed to be * {@code true} if this is {@code true} * @param flush {@code true} if more data may not be arriving soon + * @Param numMessages the number of messages this series of frames represents */ - void writeFrame(@Nullable WritableBuffer frame, boolean endOfStream, boolean flush); + void writeFrame( + @Nullable WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages); /** * Requests up to the given number of messages from the call to be delivered to the client. This @@ -100,8 +102,7 @@ public abstract class AbstractClientStream extends AbstractStream Preconditions.checkNotNull(headers, "headers"); this.useGet = useGet; if (!useGet) { - TransportTracer transportTracer = null; // TODO(zpencer): add tracing on clients - framer = new MessageFramer(this, bufferAllocator, statsTraceCtx, transportTracer); + framer = new MessageFramer(this, bufferAllocator, statsTraceCtx); this.headers = headers; } else { framer = new GetFramer(headers, statsTraceCtx); @@ -158,9 +159,10 @@ public abstract class AbstractClientStream extends AbstractStream } @Override - public final void deliverFrame(WritableBuffer frame, boolean endOfStream, boolean flush) { + public final void deliverFrame( + WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) { Preconditions.checkArgument(frame != null || endOfStream, "null frame before EOS"); - abstractClientStreamSink().writeFrame(frame, endOfStream, flush); + abstractClientStreamSink().writeFrame(frame, endOfStream, flush, numMessages); } @Override diff --git a/core/src/main/java/io/grpc/internal/AbstractServerStream.java b/core/src/main/java/io/grpc/internal/AbstractServerStream.java index f27ccfb43b..0132847c55 100644 --- a/core/src/main/java/io/grpc/internal/AbstractServerStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractServerStream.java @@ -48,8 +48,9 @@ public abstract class AbstractServerStream extends AbstractStream * * @param frame a buffer containing the chunk of data to be sent. * @param flush {@code true} if more data may not be arriving soon + * @param numMessages the number of messages this frame represents */ - void writeFrame(@Nullable WritableBuffer frame, boolean flush); + void writeFrame(@Nullable WritableBuffer frame, boolean flush, int numMessages); /** * Sends trailers to the remote end point. This call implies end of stream. @@ -77,17 +78,13 @@ public abstract class AbstractServerStream extends AbstractStream private final MessageFramer framer; private final StatsTraceContext statsTraceCtx; - private final TransportTracer transportTracer; private boolean outboundClosed; private boolean headersSent; protected AbstractServerStream( - WritableBufferAllocator bufferAllocator, - StatsTraceContext statsTraceCtx, - TransportTracer transportTracer) { + WritableBufferAllocator bufferAllocator, StatsTraceContext statsTraceCtx) { this.statsTraceCtx = Preconditions.checkNotNull(statsTraceCtx, "statsTraceCtx"); - this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer"); - framer = new MessageFramer(this, bufferAllocator, statsTraceCtx, transportTracer); + framer = new MessageFramer(this, bufferAllocator, statsTraceCtx); } @Override @@ -118,10 +115,11 @@ public abstract class AbstractServerStream extends AbstractStream } @Override - public final void deliverFrame(WritableBuffer frame, boolean endOfStream, boolean flush) { + public final void deliverFrame( + WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) { // Since endOfStream is triggered by the sending of trailers, avoid flush here and just flush // after the trailers. - abstractServerStreamSink().writeFrame(frame, endOfStream ? false : flush); + abstractServerStreamSink().writeFrame(frame, endOfStream ? false : flush, numMessages); } @Override diff --git a/core/src/main/java/io/grpc/internal/MessageFramer.java b/core/src/main/java/io/grpc/internal/MessageFramer.java index dc86f19031..8c95ffbdfb 100644 --- a/core/src/main/java/io/grpc/internal/MessageFramer.java +++ b/core/src/main/java/io/grpc/internal/MessageFramer.java @@ -55,8 +55,13 @@ public class MessageFramer implements Framer { * closed and there is no data to deliver. * @param endOfStream whether the frame is the last one for the GRPC stream * @param flush {@code true} if more data may not be arriving soon + * @param numMessages the number of messages that this series of frames represents */ - void deliverFrame(@Nullable WritableBuffer frame, boolean endOfStream, boolean flush); + void deliverFrame( + @Nullable WritableBuffer frame, + boolean endOfStream, + boolean flush, + int numMessages); } private static final int HEADER_LENGTH = 5; @@ -74,11 +79,10 @@ public class MessageFramer implements Framer { private final WritableBufferAllocator bufferAllocator; private final StatsTraceContext statsTraceCtx; // transportTracer is nullable until it is integrated with client transports - @Nullable - private final TransportTracer transportTracer; private boolean closed; // Tracing and stats-related states + private int messagesBuffered; private int currentMessageSeqNo = -1; private long currentMessageWireSize; @@ -89,14 +93,10 @@ public class MessageFramer implements Framer { * @param bufferAllocator allocates buffers that the transport can commit to the wire. */ public MessageFramer( - Sink sink, - WritableBufferAllocator bufferAllocator, - StatsTraceContext statsTraceCtx, - @Nullable TransportTracer transportTracer) { + Sink sink, WritableBufferAllocator bufferAllocator, StatsTraceContext statsTraceCtx) { this.sink = checkNotNull(sink, "sink"); this.bufferAllocator = checkNotNull(bufferAllocator, "bufferAllocator"); this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx"); - this.transportTracer = transportTracer; } @Override @@ -125,6 +125,7 @@ public class MessageFramer implements Framer { @Override public void writePayload(InputStream message) { verifyNotClosed(); + messagesBuffered++; currentMessageSeqNo++; currentMessageWireSize = 0; statsTraceCtx.outboundMessage(currentMessageSeqNo); @@ -158,9 +159,6 @@ public class MessageFramer implements Framer { statsTraceCtx.outboundUncompressedSize(written); statsTraceCtx.outboundWireSize(currentMessageWireSize); statsTraceCtx.outboundMessageSent(currentMessageSeqNo, currentMessageWireSize, written); - if (transportTracer != null) { - transportTracer.reportMessageSent(); - } } private int writeUncompressed(InputStream message, int messageLength) throws IOException { @@ -249,11 +247,14 @@ public class MessageFramer implements Framer { // Note that we are always delivering a small message to the transport here which // may incur transport framing overhead as it may be sent separately to the contents // of the GRPC frame. - sink.deliverFrame(writeableHeader, false, false); + // The final message may not be completely written because we do not flush the last buffer. + // Do not report the last message as sent. + sink.deliverFrame(writeableHeader, false, false, messagesBuffered - 1); + messagesBuffered = 1; // Commit all except the last buffer to the sink List bufferList = bufferChain.bufferList; for (int i = 0; i < bufferList.size() - 1; i++) { - sink.deliverFrame(bufferList.get(i), false, false); + sink.deliverFrame(bufferList.get(i), false, false, 0); } // Assign the current buffer to the last in the chain so it can be used // for future writes or written with end-of-stream=true on close. @@ -346,7 +347,8 @@ public class MessageFramer implements Framer { private void commitToSink(boolean endOfStream, boolean flush) { WritableBuffer buf = buffer; buffer = null; - sink.deliverFrame(buf, endOfStream, flush); + sink.deliverFrame(buf, endOfStream, flush, messagesBuffered); + messagesBuffered = 0; } private void verifyNotClosed() { diff --git a/core/src/main/java/io/grpc/internal/TransportTracer.java b/core/src/main/java/io/grpc/internal/TransportTracer.java index bb726a4042..23a5212c24 100644 --- a/core/src/main/java/io/grpc/internal/TransportTracer.java +++ b/core/src/main/java/io/grpc/internal/TransportTracer.java @@ -32,9 +32,8 @@ public final class TransportTracer { private long keepAlivesSent; private FlowControlReader flowControlWindowReader; - // TODO(zpencer): msg sent can piggyback on framer's writequeue and avoid syncing the update - private final LongCounter messagesSent = LongCounterFactory.create(); - private volatile long lastMessageSentTimeNanos; + private long messagesSent; + private long lastMessageSentTimeNanos; // deframing happens on the application thread, and there's no easy way to avoid synchronization private final LongCounter messagesReceived = LongCounterFactory.create(); private volatile long lastMessageReceivedTimeNanos; @@ -48,7 +47,7 @@ public final class TransportTracer { lastStreamCreatedTimeNanos, streamsSucceeded, streamsFailed, - messagesSent.value(), + messagesSent, messagesReceived.value(), keepAlivesSent, lastMessageSentTimeNanos, @@ -80,7 +79,7 @@ public final class TransportTracer { * Reports that a message was successfully sent. This method is thread safe. */ public void reportMessageSent() { - messagesSent.add(1); + messagesSent++; lastMessageSentTimeNanos = currentTimeNanos(); } diff --git a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java index 54fc379487..364aee96bc 100644 --- a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java @@ -326,7 +326,8 @@ public class AbstractClientStreamTest { assertTrue(payloadCaptor.getValue() != null); // GET requests don't have BODY. verify(sink, never()) - .writeFrame(any(WritableBuffer.class), any(Boolean.class), any(Boolean.class)); + .writeFrame( + any(WritableBuffer.class), any(Boolean.class), any(Boolean.class), any(Integer.class)); assertThat(tracer.nextOutboundEvent()).isEqualTo("outboundMessage(0)"); assertThat(tracer.nextOutboundEvent()).isEqualTo("outboundMessage()"); assertThat(tracer.nextOutboundEvent()).matches("outboundMessageSent\\(0, [0-9]+, [0-9]+\\)"); @@ -393,7 +394,8 @@ public class AbstractClientStreamTest { public void request(int numMessages) {} @Override - public void writeFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {} + public void writeFrame( + WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {} @Override public void cancel(Status reason) {} diff --git a/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java index 94811bcd23..8c7406ae9a 100644 --- a/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java @@ -74,8 +74,7 @@ public class AbstractServerStreamTest { stream = new AbstractServerStreamBase( allocator, sink, - new AbstractServerStreamBase.TransportState(MAX_MESSAGE_SIZE, transportTracer), - transportTracer); + new AbstractServerStreamBase.TransportState(MAX_MESSAGE_SIZE, transportTracer)); } /** @@ -267,7 +266,8 @@ public class AbstractServerStreamTest { stream.writeMessage(new ByteArrayInputStream(new byte[]{})); - verify(sink, never()).writeFrame(any(WritableBuffer.class), any(Boolean.class)); + verify(sink, never()) + .writeFrame(any(WritableBuffer.class), any(Boolean.class), any(Integer.class)); } @Test @@ -277,7 +277,7 @@ public class AbstractServerStreamTest { stream.writeMessage(new ByteArrayInputStream(new byte[]{})); stream.flush(); - verify(sink).writeFrame(any(WritableBuffer.class), eq(true)); + verify(sink).writeFrame(any(WritableBuffer.class), eq(true), eq(1)); } @Test @@ -346,8 +346,8 @@ public class AbstractServerStreamTest { private final AbstractServerStream.TransportState state; protected AbstractServerStreamBase(WritableBufferAllocator bufferAllocator, Sink sink, - AbstractServerStream.TransportState state, TransportTracer transportTracer) { - super(bufferAllocator, StatsTraceContext.NOOP, transportTracer); + AbstractServerStream.TransportState state) { + super(bufferAllocator, StatsTraceContext.NOOP); this.sink = sink; this.state = state; } diff --git a/core/src/test/java/io/grpc/internal/MessageFramerTest.java b/core/src/test/java/io/grpc/internal/MessageFramerTest.java index fd9cf40786..7cc6f7c1a0 100644 --- a/core/src/test/java/io/grpc/internal/MessageFramerTest.java +++ b/core/src/test/java/io/grpc/internal/MessageFramerTest.java @@ -16,7 +16,6 @@ package io.grpc.internal; -import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -35,7 +34,6 @@ import java.io.ByteArrayInputStream; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.Arrays; -import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -65,7 +63,6 @@ public class MessageFramerTest { private BytesWritableBufferAllocator allocator = new BytesWritableBufferAllocator(1000, 1000); private StatsTraceContext statsTraceCtx; - private TransportTracer transportTracer; /** Set up for test. */ @Before @@ -74,8 +71,7 @@ public class MessageFramerTest { // MessageDeframerTest tests with a client-side StatsTraceContext, so here we test with a // server-side StatsTraceContext. statsTraceCtx = new StatsTraceContext(new StreamTracer[]{tracer}); - transportTracer = new TransportTracer(); - framer = new MessageFramer(sink, allocator, statsTraceCtx, transportTracer); + framer = new MessageFramer(sink, allocator, statsTraceCtx); } @Test @@ -84,7 +80,7 @@ public class MessageFramerTest { verifyNoMoreInteractions(sink); framer.flush(); - verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false, true); + verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false, true, 1); assertEquals(1, allocator.allocCount); verifyNoMoreInteractions(sink); checkStats(2, 2); @@ -95,8 +91,8 @@ public class MessageFramerTest { writeUnknownLength(framer, new byte[]{3, 14}); framer.flush(); // Header is written first, then payload - verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2}), false, false); - verify(sink).deliverFrame(toWriteBuffer(new byte[] {3, 14}), false, true); + verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2}), false, false, 0); + verify(sink).deliverFrame(toWriteBuffer(new byte[] {3, 14}), false, true, 1); assertEquals(2, allocator.allocCount); verifyNoMoreInteractions(sink); checkStats(2, 2); @@ -110,7 +106,7 @@ public class MessageFramerTest { verifyNoMoreInteractions(sink); framer.flush(); verify(sink).deliverFrame( - toWriteBuffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 1, 14}), false, true); + toWriteBuffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 1, 14}), false, true, 2); verifyNoMoreInteractions(sink); assertEquals(1, allocator.allocCount); checkStats(1, 1, 1, 1); @@ -122,7 +118,7 @@ public class MessageFramerTest { verifyNoMoreInteractions(sink); framer.close(); verify(sink).deliverFrame( - toWriteBuffer(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9, 2, 6}), true, true); + toWriteBuffer(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9, 2, 6}), true, true, 1); verifyNoMoreInteractions(sink); assertEquals(1, allocator.allocCount); checkStats(7, 7); @@ -131,7 +127,7 @@ public class MessageFramerTest { @Test public void closeWithoutBufferedFrameGivesNullBuffer() { framer.close(); - verify(sink).deliverFrame(null, true, true); + verify(sink).deliverFrame(null, true, true, 0); verifyNoMoreInteractions(sink); assertEquals(0, allocator.allocCount); checkStats(); @@ -140,14 +136,14 @@ public class MessageFramerTest { @Test public void payloadSplitBetweenSinks() { allocator = new BytesWritableBufferAllocator(12, 12); - framer = new MessageFramer(sink, allocator, statsTraceCtx, transportTracer); + framer = new MessageFramer(sink, allocator, statsTraceCtx); writeKnownLength(framer, new byte[]{3, 14, 1, 5, 9, 2, 6, 5}); verify(sink).deliverFrame( - toWriteBuffer(new byte[] {0, 0, 0, 0, 8, 3, 14, 1, 5, 9, 2, 6}), false, false); + toWriteBuffer(new byte[] {0, 0, 0, 0, 8, 3, 14, 1, 5, 9, 2, 6}), false, false, 1); verifyNoMoreInteractions(sink); framer.flush(); - verify(sink).deliverFrame(toWriteBuffer(new byte[] {5}), false, true); + verify(sink).deliverFrame(toWriteBuffer(new byte[] {5}), false, true, 0); verifyNoMoreInteractions(sink); assertEquals(2, allocator.allocCount); checkStats(8, 8); @@ -156,15 +152,15 @@ public class MessageFramerTest { @Test public void frameHeaderSplitBetweenSinks() { allocator = new BytesWritableBufferAllocator(12, 12); - framer = new MessageFramer(sink, allocator, statsTraceCtx, transportTracer); + framer = new MessageFramer(sink, allocator, statsTraceCtx); writeKnownLength(framer, new byte[]{3, 14, 1}); writeKnownLength(framer, new byte[]{3}); verify(sink).deliverFrame( - toWriteBuffer(new byte[] {0, 0, 0, 0, 3, 3, 14, 1, 0, 0, 0, 0}), false, false); + toWriteBuffer(new byte[] {0, 0, 0, 0, 3, 3, 14, 1, 0, 0, 0, 0}), false, false, 2); verifyNoMoreInteractions(sink); framer.flush(); - verify(sink).deliverFrame(toWriteBufferWithMinSize(new byte[] {1, 3}, 12), false, true); + verify(sink).deliverFrame(toWriteBufferWithMinSize(new byte[] {1, 3}, 12), false, true, 0); verifyNoMoreInteractions(sink); assertEquals(2, allocator.allocCount); checkStats(3, 3, 1, 1); @@ -174,7 +170,7 @@ public class MessageFramerTest { public void emptyPayloadYieldsFrame() throws Exception { writeKnownLength(framer, new byte[0]); framer.flush(); - verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false, true); + verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false, true, 1); assertEquals(1, allocator.allocCount); checkStats(0, 0); } @@ -184,7 +180,7 @@ public class MessageFramerTest { writeUnknownLength(framer, new byte[0]); verifyZeroInteractions(sink); framer.flush(); - verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false, true); + verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false, true, 1); // One alloc for the header assertEquals(1, allocator.allocCount); checkStats(0, 0); @@ -195,7 +191,7 @@ public class MessageFramerTest { writeKnownLength(framer, new byte[]{3, 14}); framer.flush(); framer.flush(); - verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false, true); + verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false, true, 1); verifyNoMoreInteractions(sink); assertEquals(1, allocator.allocCount); checkStats(2, 2); @@ -204,10 +200,10 @@ public class MessageFramerTest { @Test public void largerFrameSize() throws Exception { allocator = new BytesWritableBufferAllocator(0, 10000); - framer = new MessageFramer(sink, allocator, statsTraceCtx, transportTracer); + framer = new MessageFramer(sink, allocator, statsTraceCtx); writeKnownLength(framer, new byte[1000]); framer.flush(); - verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true)); + verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true), eq(1)); ByteWritableBuffer buffer = frameCaptor.getValue(); assertEquals(1005, buffer.size()); @@ -225,13 +221,14 @@ public class MessageFramerTest { public void largerFrameSizeUnknownLength() throws Exception { // Force payload to be split into two chunks allocator = new BytesWritableBufferAllocator(500, 500); - framer = new MessageFramer(sink, allocator, statsTraceCtx, transportTracer); + framer = new MessageFramer(sink, allocator, statsTraceCtx); writeUnknownLength(framer, new byte[1000]); framer.flush(); // Header and first chunk written with flush = false - verify(sink, times(2)).deliverFrame(frameCaptor.capture(), eq(false), eq(false)); + verify(sink, times(2)).deliverFrame(frameCaptor.capture(), eq(false), eq(false), eq(0)); // On flush third buffer written with flish = true - verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true)); + // The message count is only bumped when a message is completely written. + verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true), eq(1)); // header has fixed length of 5 and specifies correct length assertEquals(5, frameCaptor.getAllValues().get(0).readableBytes()); @@ -252,13 +249,14 @@ public class MessageFramerTest { public void compressed() throws Exception { allocator = new BytesWritableBufferAllocator(100, Integer.MAX_VALUE); // setMessageCompression should default to true - framer = new MessageFramer(sink, allocator, statsTraceCtx, transportTracer) + framer = new MessageFramer(sink, allocator, statsTraceCtx) .setCompressor(new Codec.Gzip()); writeKnownLength(framer, new byte[1000]); framer.flush(); // The GRPC header is written first as a separate frame. - verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(false)); - verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true)); + // The message count is only bumped when a message is completely written. + verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(false), eq(0)); + verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true), eq(1)); // Check the header ByteWritableBuffer buffer = frameCaptor.getAllValues().get(0); @@ -277,12 +275,12 @@ public class MessageFramerTest { @Test public void dontCompressIfNoEncoding() throws Exception { allocator = new BytesWritableBufferAllocator(100, Integer.MAX_VALUE); - framer = new MessageFramer(sink, allocator, statsTraceCtx, transportTracer) + framer = new MessageFramer(sink, allocator, statsTraceCtx) .setMessageCompression(true); writeKnownLength(framer, new byte[1000]); framer.flush(); // The GRPC header is written first as a separate frame - verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true)); + verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true), eq(1)); // Check the header ByteWritableBuffer buffer = frameCaptor.getAllValues().get(0); @@ -302,13 +300,13 @@ public class MessageFramerTest { @Test public void dontCompressIfNotRequested() throws Exception { allocator = new BytesWritableBufferAllocator(100, Integer.MAX_VALUE); - framer = new MessageFramer(sink, allocator, statsTraceCtx, transportTracer) + framer = new MessageFramer(sink, allocator, statsTraceCtx) .setCompressor(new Codec.Gzip()) .setMessageCompression(false); writeKnownLength(framer, new byte[1000]); framer.flush(); // The GRPC header is written first as a separate frame - verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true)); + verify(sink).deliverFrame(frameCaptor.capture(), eq(false), eq(true), eq(1)); // Check the header ByteWritableBuffer buffer = frameCaptor.getAllValues().get(0); @@ -330,7 +328,8 @@ public class MessageFramerTest { MessageFramer.Sink reentrant = new MessageFramer.Sink() { int count = 0; @Override - public void deliverFrame(WritableBuffer frame, boolean endOfStream, boolean flush) { + public void deliverFrame( + WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) { if (count == 0) { framer.close(); count++; @@ -339,7 +338,7 @@ public class MessageFramerTest { } } }; - framer = new MessageFramer(reentrant, allocator, statsTraceCtx, transportTracer); + framer = new MessageFramer(reentrant, allocator, statsTraceCtx); writeKnownLength(framer, new byte[]{3, 14}); framer.close(); } @@ -350,7 +349,7 @@ public class MessageFramerTest { framer.setMessageCompression(true); writeKnownLength(framer, new byte[]{}); framer.flush(); - verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false, true); + verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false, true, 1); checkStats(0, 0); } @@ -394,16 +393,6 @@ public class MessageFramerTest { assertNull(tracer.nextInboundEvent()); assertEquals(expectedWireSize, tracer.getOutboundWireSize()); assertEquals(expectedUncompressedSize, tracer.getOutboundUncompressedSize()); - - TransportTracer.Stats transportStats = transportTracer.getStats(); - assertEquals(count, transportStats.messagesSent); - long transportSentMsgMs = TimeUnit.NANOSECONDS.toMillis( - transportStats.lastMessageSentTimeNanos); - if (count > 0) { - assertThat(System.currentTimeMillis() - transportSentMsgMs).isAtMost(50L); - } else { - assertEquals(0, transportSentMsgMs); - } } static class ByteWritableBuffer implements WritableBuffer { diff --git a/netty/src/main/java/io/grpc/netty/NettyClientStream.java b/netty/src/main/java/io/grpc/netty/NettyClientStream.java index 41f046695c..9badcb45b6 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientStream.java @@ -146,14 +146,15 @@ class NettyClientStream extends AbstractClientStream { } @Override - public void writeFrame(WritableBuffer frame, boolean endOfStream, boolean flush) { + public void writeFrame( + WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) { ByteBuf bytebuf = frame == null ? EMPTY_BUFFER : ((NettyWritableBuffer) frame).bytebuf(); final int numBytes = bytebuf.readableBytes(); if (numBytes > 0) { // Add the bytes to outbound flow control. onSendingBytes(numBytes); writeQueue.enqueue( - new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), + new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream, numMessages), channel.newPromise().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { @@ -168,7 +169,9 @@ class NettyClientStream extends AbstractClientStream { }), flush); } else { // The frame is empty and will not impact outbound flow control. Just send it. - writeQueue.enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush); + writeQueue.enqueue( + new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream, numMessages), + flush); } } diff --git a/netty/src/main/java/io/grpc/netty/NettyServerStream.java b/netty/src/main/java/io/grpc/netty/NettyServerStream.java index 37c91a50ab..b62896d787 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerStream.java @@ -48,6 +48,7 @@ class NettyServerStream extends AbstractServerStream { private final WriteQueue writeQueue; private final Attributes attributes; private final String authority; + private final TransportTracer transportTracer; public NettyServerStream( Channel channel, @@ -56,12 +57,13 @@ class NettyServerStream extends AbstractServerStream { String authority, StatsTraceContext statsTraceCtx, TransportTracer transportTracer) { - super(new NettyWritableBufferAllocator(channel.alloc()), statsTraceCtx, transportTracer); + super(new NettyWritableBufferAllocator(channel.alloc()), statsTraceCtx); this.state = checkNotNull(state, "transportState"); this.channel = checkNotNull(channel, "channel"); this.writeQueue = state.handler.getWriteQueue(); this.attributes = checkNotNull(transportAttrs); this.authority = authority; + this.transportTracer = checkNotNull(transportTracer, "transportTracer"); } @Override @@ -110,7 +112,7 @@ class NettyServerStream extends AbstractServerStream { } @Override - public void writeFrame(WritableBuffer frame, boolean flush) { + public void writeFrame(WritableBuffer frame, boolean flush, int numMessages) { if (frame == null) { writeQueue.scheduleFlush(); return; @@ -120,13 +122,16 @@ class NettyServerStream extends AbstractServerStream { // Add the bytes to outbound flow control. onSendingBytes(numBytes); writeQueue.enqueue( - new SendGrpcFrameCommand(transportState(), bytebuf, false), + new SendGrpcFrameCommand(transportState(), bytebuf, false, numMessages), channel.newPromise().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // Remove the bytes from outbound flow control, optionally notifying // the client that they can send more bytes. transportState().onSentBytes(numBytes); + if (future.isSuccess()) { + transportTracer.reportMessageSent(); + } } }), flush); } diff --git a/netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java b/netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java index 6a7653eecd..bcb0eeccfb 100644 --- a/netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java +++ b/netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java @@ -28,13 +28,16 @@ import io.netty.channel.ChannelPromise; class SendGrpcFrameCommand extends DefaultByteBufHolder implements WriteQueue.QueuedCommand { private final StreamIdHolder stream; private final boolean endStream; + private final int numMessages; private ChannelPromise promise; - SendGrpcFrameCommand(StreamIdHolder stream, ByteBuf content, boolean endStream) { + SendGrpcFrameCommand( + StreamIdHolder stream, ByteBuf content, boolean endStream, int numMessages) { super(content); this.stream = stream; this.endStream = endStream; + this.numMessages = numMessages; } int streamId() { @@ -47,12 +50,12 @@ class SendGrpcFrameCommand extends DefaultByteBufHolder implements WriteQueue.Qu @Override public ByteBufHolder copy() { - return new SendGrpcFrameCommand(stream, content().copy(), endStream); + return new SendGrpcFrameCommand(stream, content().copy(), endStream, numMessages); } @Override public ByteBufHolder duplicate() { - return new SendGrpcFrameCommand(stream, content().duplicate(), endStream); + return new SendGrpcFrameCommand(stream, content().duplicate(), endStream, numMessages); } @Override @@ -86,13 +89,14 @@ class SendGrpcFrameCommand extends DefaultByteBufHolder implements WriteQueue.Qu } SendGrpcFrameCommand thatCmd = (SendGrpcFrameCommand) that; return thatCmd.stream.equals(stream) && thatCmd.endStream == endStream - && thatCmd.content().equals(content()); + && thatCmd.content().equals(content()) && thatCmd.numMessages == numMessages; } @Override public String toString() { return getClass().getSimpleName() + "(streamId=" + streamId() + ", endStream=" + endStream + ", content=" + content() + + ", numMessages=" + numMessages + ")"; } @@ -103,6 +107,7 @@ class SendGrpcFrameCommand extends DefaultByteBufHolder implements WriteQueue.Qu if (endStream) { hash = -hash; } + hash = hash * 31 + numMessages; return hash; } diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index 66b9889e31..49c83da3ae 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -267,7 +267,8 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase { MessageFramer framer = new MessageFramer( new MessageFramer.Sink() { @Override - public void deliverFrame(WritableBuffer frame, boolean endOfStream, boolean flush) { + public void deliverFrame( + WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) { if (frame != null) { ByteBuf bytebuf = ((NettyWritableBuffer) frame).bytebuf(); compressionFrame.writeBytes(bytebuf); @@ -252,8 +253,7 @@ public abstract class NettyHandlerTestBase { } }, new NettyWritableBufferAllocator(ByteBufAllocator.DEFAULT), - StatsTraceContext.NOOP, - noTransportTracer); + StatsTraceContext.NOOP); framer.writePayload(new ByteArrayInputStream(content)); framer.flush(); ChannelHandlerContext ctx = newMockContext(); diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java index 008ba23efc..1f056570c5 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java @@ -202,7 +202,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase