From 7e534ed704441cdef7f2af6c094148acf3dabd8b Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Tue, 19 Sep 2017 09:22:11 -0700 Subject: [PATCH] core: record individual messages with sizes to Census/tracing (#3461) Two methods, outboundMessageSent() and inboundMessageRead() are added to StreamTracer in order to associate individual messages with sizes. Both types of sizes are optional, as allowed by Census tracing. Both methods accept a sequence number as the type ID as required by Census. The original outboundMesage() and inboundMessage() are also replaced by overrides that take the sequence number, to better match the new methods. The deprecation of the old overrides are tracked by #3460 --- build.gradle | 2 +- core/build.gradle | 2 + core/src/main/java/io/grpc/StreamTracer.java | 50 ++++++++++++ .../grpc/internal/AbstractClientStream.java | 3 +- .../io/grpc/internal/CensusStatsModule.java | 8 +- .../io/grpc/internal/CensusTracingModule.java | 54 ++++++++++++- .../io/grpc/internal/MessageDeframer.java | 8 +- .../java/io/grpc/internal/MessageFramer.java | 14 +++- .../io/grpc/internal/StatsTraceContext.java | 34 +++++++- .../internal/AbstractClientStreamTest.java | 8 +- .../io/grpc/internal/CensusModulesTest.java | 73 ++++++++++++++--- .../io/grpc/internal/MessageDeframerTest.java | 78 +++++++++++------- .../io/grpc/internal/MessageFramerTest.java | 61 ++++++++------ .../grpc/grpclb/GrpclbClientLoadRecorder.java | 2 +- .../integration/AbstractInteropTest.java | 19 ++++- .../testing/AbstractTransportTest.java | 27 +++++-- .../testing/TestClientStreamTracer.java | 42 +++++++--- .../testing/TestServerStreamTracer.java | 42 +++++++--- .../internal/testing/TestStreamTracer.java | 81 +++++++++++++------ 19 files changed, 468 insertions(+), 140 deletions(-) diff --git a/build.gradle b/build.gradle index d7db6ebed9..c7b00125c3 100644 --- a/build.gradle +++ b/build.gradle @@ -190,7 +190,7 @@ subprojects { google_auth_credentials: 'com.google.auth:google-auth-library-credentials:0.4.0', okhttp: 'com.squareup.okhttp:okhttp:2.5.0', okio: 'com.squareup.okio:okio:1.6.0', - opencensus_api: 'io.opencensus:opencensus-api:0.5.1', + opencensus_api: 'io.opencensus:opencensus-api:0.6.0', instrumentation_api: 'com.google.instrumentation:instrumentation-api:0.4.3', protobuf: "com.google.protobuf:protobuf-java:${protobufVersion}", protobuf_lite: "com.google.protobuf:protobuf-lite:3.0.1", diff --git a/core/build.gradle b/core/build.gradle index 81ceffffd7..b6cd5f51fa 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -12,6 +12,8 @@ dependencies { exclude group: 'io.grpc', module: 'grpc-context' } compile (libraries.opencensus_api) { + // prefer 3.0.0 from libraries instead of 3.0.1 + exclude group: 'com.google.code.findbugs', module: 'jsr305' // prefer 2.0.19 from libraries instead of 2.0.11 exclude group: 'com.google.errorprone', module: 'error_prone_annotations' // we'll always be more up-to-date diff --git a/core/src/main/java/io/grpc/StreamTracer.java b/core/src/main/java/io/grpc/StreamTracer.java index bf1b33f5f4..3f78c7867e 100644 --- a/core/src/main/java/io/grpc/StreamTracer.java +++ b/core/src/main/java/io/grpc/StreamTracer.java @@ -36,18 +36,68 @@ public abstract class StreamTracer { * An outbound message has been passed to the stream. This is called as soon as the stream knows * about the message, but doesn't have further guarantee such as whether the message is serialized * or not. + * + * @deprecated use {@link #outboundMessage(int)} */ + @Deprecated public void outboundMessage() { } + /** + * An outbound message has been passed to the stream. This is called as soon as the stream knows + * about the message, but doesn't have further guarantee such as whether the message is serialized + * or not. + * + * @param seqNo the sequential number of the message within the stream, starting from 0. It can + * be used to correlate with {@link #outboundMessageSent} for the same message. + */ + public void outboundMessage(int seqNo) { + } + /** * An inbound message has been received by the stream. This is called as soon as the stream knows * about the message, but doesn't have further guarantee such as whether the message is * deserialized or not. + * + * @deprecated use {@link #inboundMessage(int)} */ + @Deprecated public void inboundMessage() { } + /** + * An inbound message has been received by the stream. This is called as soon as the stream knows + * about the message, but doesn't have further guarantee such as whether the message is + * deserialized or not. + * + * @param seqNo the sequential number of the message within the stream, starting from 0. It can + * be used to correlate with {@link #inboundMessageRead} for the same message. + */ + public void inboundMessage(int seqNo) { + } + + /** + * An outbound message has been serialized and sent to the transport. + * + * @param seqNo the sequential number of the message within the stream, starting from 0. It can + * be used to correlate with {@link #outboundMessage(int)} for the same message. + * @param optionalWireSize the wire size of the message. -1 if unknown + * @param optionalUncompressedSize the uncompressed serialized size of the message. -1 if unknown + */ + public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) { + } + + /** + * An inbound message has been fully read from the transport. + * + * @param seqNo the sequential number of the message within the stream, starting from 0. It can + * be used to correlate with {@link #inboundMessage(int)} for the same message. + * @param optionalWireSize the wire size of the message. -1 if unknown + * @param optionalUncompressedSize the uncompressed serialized size of the message. -1 if unknown + */ + public void inboundMessageRead(int seqNo, long optionalWireSize, long optionalUncompressedSize) { + } + /** * The wire size of some outbound data is revealed. This can only used to record the accumulative * outbound wire size. There is no guarantee wrt timing or granularity of this method. diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java index 2ba8a78dcc..d6af1f27be 100644 --- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractClientStream.java @@ -359,7 +359,8 @@ public abstract class AbstractClientStream extends AbstractStream } catch (java.io.IOException ex) { throw new RuntimeException(ex); } - statsTraceCtx.outboundMessage(); + statsTraceCtx.outboundMessage(0); + statsTraceCtx.outboundMessageSent(0, payload.length, payload.length); statsTraceCtx.outboundUncompressedSize(payload.length); // NB(zhangkun83): this is not accurate, because the underlying transport will probably encode // it using e.g., base64. However, we are not supposed to know such detail here. diff --git a/core/src/main/java/io/grpc/internal/CensusStatsModule.java b/core/src/main/java/io/grpc/internal/CensusStatsModule.java index d917b6b1a7..7d4cbcce14 100644 --- a/core/src/main/java/io/grpc/internal/CensusStatsModule.java +++ b/core/src/main/java/io/grpc/internal/CensusStatsModule.java @@ -161,12 +161,12 @@ final class CensusStatsModule { } @Override - public void inboundMessage() { + public void inboundMessage(int seqNo) { inboundMessageCount.incrementAndGet(); } @Override - public void outboundMessage() { + public void outboundMessage(int seqNo) { outboundMessageCount.incrementAndGet(); } } @@ -282,12 +282,12 @@ final class CensusStatsModule { } @Override - public void inboundMessage() { + public void inboundMessage(int seqNo) { inboundMessageCount.incrementAndGet(); } @Override - public void outboundMessage() { + public void outboundMessage(int seqNo) { outboundMessageCount.incrementAndGet(); } diff --git a/core/src/main/java/io/grpc/internal/CensusTracingModule.java b/core/src/main/java/io/grpc/internal/CensusTracingModule.java index 8e31c98985..7520b24bf4 100644 --- a/core/src/main/java/io/grpc/internal/CensusTracingModule.java +++ b/core/src/main/java/io/grpc/internal/CensusTracingModule.java @@ -33,6 +33,7 @@ import io.grpc.MethodDescriptor; import io.grpc.ServerStreamTracer; import io.grpc.StreamTracer; import io.opencensus.trace.EndSpanOptions; +import io.opencensus.trace.NetworkEvent; import io.opencensus.trace.Span; import io.opencensus.trace.SpanContext; import io.opencensus.trace.Status; @@ -56,8 +57,6 @@ import javax.annotation.Nullable; */ final class CensusTracingModule { private static final Logger logger = Logger.getLogger(CensusTracingModule.class.getName()); - // TODO(zhangkun83): record NetworkEvent to Span for each message - private static final ClientStreamTracer noopClientTracer = new ClientStreamTracer() {}; private final Tracer censusTracer; @VisibleForTesting @@ -182,6 +181,19 @@ final class CensusTracingModule { return EndSpanOptions.builder().setStatus(convertStatus(status)).build(); } + private static void recordNetworkEvent( + Span span, NetworkEvent.Type type, + int seqNo, long optionalWireSize, long optionalUncompressedSize) { + NetworkEvent.Builder eventBuilder = NetworkEvent.builder(type, seqNo); + if (optionalUncompressedSize != -1) { + eventBuilder.setUncompressedMessageSize(optionalUncompressedSize); + } + if (optionalWireSize != -1) { + eventBuilder.setCompressedMessageSize(optionalWireSize); + } + span.addNetworkEvent(eventBuilder.build()); + } + @VisibleForTesting final class ClientCallTracer extends ClientStreamTracer.Factory { @@ -201,7 +213,7 @@ final class CensusTracingModule { public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) { headers.discardAll(tracingHeader); headers.put(tracingHeader, span.getContext()); - return noopClientTracer; + return new ClientTracer(span); } /** @@ -218,6 +230,28 @@ final class CensusTracingModule { } } + private static final class ClientTracer extends ClientStreamTracer { + private final Span span; + + ClientTracer(Span span) { + this.span = checkNotNull(span, "span"); + } + + @Override + public void outboundMessageSent( + int seqNo, long optionalWireSize, long optionalUncompressedSize) { + recordNetworkEvent( + span, NetworkEvent.Type.SENT, seqNo, optionalWireSize, optionalUncompressedSize); + } + + @Override + public void inboundMessageRead( + int seqNo, long optionalWireSize, long optionalUncompressedSize) { + recordNetworkEvent( + span, NetworkEvent.Type.RECV, seqNo, optionalWireSize, optionalUncompressedSize); + } + } + private final class ServerTracer extends ServerStreamTracer { private final Span span; private final AtomicBoolean streamClosed = new AtomicBoolean(false); @@ -252,6 +286,20 @@ final class CensusTracingModule { // inherit from the parent Context. return context.withValue(CONTEXT_SPAN_KEY, span); } + + @Override + public void outboundMessageSent( + int seqNo, long optionalWireSize, long optionalUncompressedSize) { + recordNetworkEvent( + span, NetworkEvent.Type.SENT, seqNo, optionalWireSize, optionalUncompressedSize); + } + + @Override + public void inboundMessageRead( + int seqNo, long optionalWireSize, long optionalUncompressedSize) { + recordNetworkEvent( + span, NetworkEvent.Type.RECV, seqNo, optionalWireSize, optionalUncompressedSize); + } } @VisibleForTesting diff --git a/core/src/main/java/io/grpc/internal/MessageDeframer.java b/core/src/main/java/io/grpc/internal/MessageDeframer.java index f368e2cf1e..3274d7babe 100644 --- a/core/src/main/java/io/grpc/internal/MessageDeframer.java +++ b/core/src/main/java/io/grpc/internal/MessageDeframer.java @@ -94,6 +94,7 @@ public class MessageDeframer implements Closeable, Deframer { private CompositeReadableBuffer unprocessed = new CompositeReadableBuffer(); private long pendingDeliveries; private boolean inDelivery = false; + private int currentMessageSeqNo = -1; private boolean closeWhenComplete = false; private volatile boolean stopDelivery = false; @@ -317,7 +318,8 @@ public class MessageDeframer implements Closeable, Deframer { .asRuntimeException(); } - statsTraceCtx.inboundMessage(); + currentMessageSeqNo++; + statsTraceCtx.inboundMessage(currentMessageSeqNo); // Continue reading the frame body. state = State.BODY; } @@ -326,6 +328,10 @@ public class MessageDeframer implements Closeable, Deframer { * Processes the GRPC message body, which depending on frame header flags may be compressed. */ private void processBody() { + // There is no reliable way to get the uncompressed size per message when it's compressed, + // because the uncompressed bytes are provided through an InputStream whose total size is + // unknown until all bytes are read, and we don't know when it happens. + statsTraceCtx.inboundMessageRead(currentMessageSeqNo, requiredLength, -1); InputStream stream = compressedFlag ? getCompressedBody() : getUncompressedBody(); nextFrame = null; listener.messagesAvailable(new SingleMessageProducer(stream)); diff --git a/core/src/main/java/io/grpc/internal/MessageFramer.java b/core/src/main/java/io/grpc/internal/MessageFramer.java index 665f84eeea..e401005794 100644 --- a/core/src/main/java/io/grpc/internal/MessageFramer.java +++ b/core/src/main/java/io/grpc/internal/MessageFramer.java @@ -75,6 +75,10 @@ public class MessageFramer implements Framer { private final StatsTraceContext statsTraceCtx; private boolean closed; + // Tracing and stats-related states + private int currentMessageSeqNo = -1; + private long currentMessageWireSize; + /** * Creates a {@code MessageFramer}. * @@ -114,7 +118,9 @@ public class MessageFramer implements Framer { @Override public void writePayload(InputStream message) { verifyNotClosed(); - statsTraceCtx.outboundMessage(); + currentMessageSeqNo++; + currentMessageWireSize = 0; + statsTraceCtx.outboundMessage(currentMessageSeqNo); boolean compressed = messageCompression && compressor != Codec.Identity.NONE; int written = -1; int messageLength = -2; @@ -143,11 +149,13 @@ public class MessageFramer implements Framer { throw Status.INTERNAL.withDescription(err).asRuntimeException(); } statsTraceCtx.outboundUncompressedSize(written); + statsTraceCtx.outboundWireSize(currentMessageWireSize); + statsTraceCtx.outboundMessageSent(currentMessageSeqNo, currentMessageWireSize, written); } private int writeUncompressed(InputStream message, int messageLength) throws IOException { if (messageLength != -1) { - statsTraceCtx.outboundWireSize(messageLength); + currentMessageWireSize = messageLength; return writeKnownLengthUncompressed(message, messageLength); } BufferChainOutputStream bufferChain = new BufferChainOutputStream(); @@ -240,7 +248,7 @@ public class MessageFramer implements Framer { // Assign the current buffer to the last in the chain so it can be used // for future writes or written with end-of-stream=true on close. buffer = bufferList.get(bufferList.size() - 1); - statsTraceCtx.outboundWireSize(messageLength); + currentMessageWireSize = messageLength; } private static int writeToOutputStream(InputStream message, OutputStream outputStream) diff --git a/core/src/main/java/io/grpc/internal/StatsTraceContext.java b/core/src/main/java/io/grpc/internal/StatsTraceContext.java index 1326a1b856..a37a8cc1eb 100644 --- a/core/src/main/java/io/grpc/internal/StatsTraceContext.java +++ b/core/src/main/java/io/grpc/internal/StatsTraceContext.java @@ -150,27 +150,53 @@ public final class StatsTraceContext { } /** - * See {@link StreamTracer#outboundMessage}. + * See {@link StreamTracer#outboundMessage(int)}. * *

Called from {@link io.grpc.internal.Framer}. */ - public void outboundMessage() { + @SuppressWarnings("deprecation") + public void outboundMessage(int seqNo) { for (StreamTracer tracer : tracers) { + tracer.outboundMessage(seqNo); tracer.outboundMessage(); } } /** - * See {@link StreamTracer#inboundMessage}. + * See {@link StreamTracer#inboundMessage(int)}. * *

Called from {@link io.grpc.internal.MessageDeframer}. */ - public void inboundMessage() { + @SuppressWarnings("deprecation") + public void inboundMessage(int seqNo) { for (StreamTracer tracer : tracers) { + tracer.inboundMessage(seqNo); tracer.inboundMessage(); } } + /** + * See {@link StreamTracer#outboundMessageSent}. + * + *

Called from {@link io.grpc.internal.Framer}. + */ + public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) { + for (StreamTracer tracer : tracers) { + tracer.outboundMessageSent(seqNo, optionalWireSize, optionalUncompressedSize); + } + } + + /** + * See {@link StreamTracer#inboundMessageRead}. + * + *

Called from {@link io.grpc.internal.MessageDeframer}. + */ + public void inboundMessageRead(int seqNo, long optionalWireSize, long optionalUncompressedSize) { + for (StreamTracer tracer : tracers) { + tracer.inboundMessageRead(seqNo, optionalWireSize, optionalUncompressedSize); + } + } + /** * See {@link StreamTracer#outboundUncompressedSize}. * diff --git a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java index badd716516..7f9f6d9dea 100644 --- a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java @@ -16,8 +16,10 @@ package io.grpc.internal; +import static com.google.common.truth.Truth.assertThat; import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -239,7 +241,11 @@ public class AbstractClientStreamTest { // GET requests don't have BODY. verify(sink, never()) .writeFrame(any(WritableBuffer.class), any(Boolean.class), any(Boolean.class)); - assertEquals(1, tracer.getOutboundMessageCount()); + assertThat(tracer.nextOutboundEvent()).isEqualTo("outboundMessage(0)"); + assertThat(tracer.nextOutboundEvent()).isEqualTo("outboundMessage()"); + assertThat(tracer.nextOutboundEvent()).matches("outboundMessageSent\\(0, [0-9]+, [0-9]+\\)"); + assertNull(tracer.nextOutboundEvent()); + assertNull(tracer.nextInboundEvent()); assertEquals(1, tracer.getOutboundWireSize()); assertEquals(1, tracer.getOutboundUncompressedSize()); } diff --git a/core/src/test/java/io/grpc/internal/CensusModulesTest.java b/core/src/test/java/io/grpc/internal/CensusModulesTest.java index 90b980d409..84cecd4625 100644 --- a/core/src/test/java/io/grpc/internal/CensusModulesTest.java +++ b/core/src/test/java/io/grpc/internal/CensusModulesTest.java @@ -31,8 +31,10 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isNull; import static org.mockito.Matchers.same; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyZeroInteractions; @@ -63,6 +65,7 @@ import io.opencensus.trace.AttributeValue; import io.opencensus.trace.EndSpanOptions; import io.opencensus.trace.Link; import io.opencensus.trace.NetworkEvent; +import io.opencensus.trace.NetworkEvent.Type; import io.opencensus.trace.Sampler; import io.opencensus.trace.Span; import io.opencensus.trace.SpanBuilder; @@ -90,6 +93,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.ArgumentCaptor; import org.mockito.Captor; +import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -166,6 +170,8 @@ public class CensusModulesTest { private ArgumentCaptor> clientCallListenerCaptor; @Captor private ArgumentCaptor statusCaptor; + @Captor + private ArgumentCaptor networkEventCaptor; private CensusStatsModule censusStats; private CensusTracingModule censusTracing; @@ -322,20 +328,20 @@ public class CensusModulesTest { tracer.outboundHeaders(); fakeClock.forwardTime(100, MILLISECONDS); - tracer.outboundMessage(); + tracer.outboundMessage(0); tracer.outboundWireSize(1028); tracer.outboundUncompressedSize(1128); fakeClock.forwardTime(16, MILLISECONDS); - tracer.inboundMessage(); + tracer.inboundMessage(0); tracer.inboundWireSize(33); tracer.inboundUncompressedSize(67); - tracer.outboundMessage(); + tracer.outboundMessage(1); tracer.outboundWireSize(99); tracer.outboundUncompressedSize(865); fakeClock.forwardTime(24, MILLISECONDS); - tracer.inboundMessage(); + tracer.inboundMessage(1); tracer.inboundWireSize(154); tracer.inboundUncompressedSize(552); tracer.streamClosed(Status.OK); @@ -372,11 +378,32 @@ public class CensusModulesTest { eq("Sent.package1.service2.method3"), isNull(Span.class)); verify(spyClientSpan, never()).end(any(EndSpanOptions.class)); + clientStreamTracer.outboundMessage(0); + clientStreamTracer.outboundMessageSent(0, 882, -1); + clientStreamTracer.inboundMessage(0); + clientStreamTracer.outboundMessage(1); + clientStreamTracer.outboundMessageSent(1, -1, 27); + clientStreamTracer.inboundMessageRead(0, 255, 90); + clientStreamTracer.streamClosed(Status.OK); callTracer.callEnded(Status.OK); - verify(spyClientSpan).end( + InOrder inOrder = inOrder(spyClientSpan); + inOrder.verify(spyClientSpan, times(3)).addNetworkEvent(networkEventCaptor.capture()); + List events = networkEventCaptor.getAllValues(); + assertEquals( + NetworkEvent.builder(Type.SENT, 0).setCompressedMessageSize(882).build(), events.get(0)); + assertEquals( + NetworkEvent.builder(Type.SENT, 1).setUncompressedMessageSize(27).build(), events.get(1)); + assertEquals( + NetworkEvent.builder(Type.RECV, 0) + .setCompressedMessageSize(255) + .setUncompressedMessageSize(90) + .build(), + events.get(2)); + inOrder.verify(spyClientSpan).end( EndSpanOptions.builder().setStatus(io.opencensus.trace.Status.OK).build()); + verifyNoMoreInteractions(spyClientSpan); verifyNoMoreInteractions(tracer); } @@ -424,7 +451,7 @@ public class CensusModulesTest { io.opencensus.trace.Status.DEADLINE_EXCEEDED .withDescription("3 seconds")) .build()); - verify(spyClientSpan, never()).end(); + verifyNoMoreInteractions(spyClientSpan); } @Test @@ -593,20 +620,20 @@ public class CensusModulesTest { .with(RpcConstants.RPC_SERVER_METHOD, TagValue.create(method.getFullMethodName())), statsCtx); - tracer.inboundMessage(); + tracer.inboundMessage(0); tracer.inboundWireSize(34); tracer.inboundUncompressedSize(67); fakeClock.forwardTime(100, MILLISECONDS); - tracer.outboundMessage(); + tracer.outboundMessage(0); tracer.outboundWireSize(1028); tracer.outboundUncompressedSize(1128); fakeClock.forwardTime(16, MILLISECONDS); - tracer.inboundMessage(); + tracer.inboundMessage(1); tracer.inboundWireSize(154); tracer.inboundUncompressedSize(552); - tracer.outboundMessage(); + tracer.outboundMessage(1); tracer.outboundWireSize(99); tracer.outboundUncompressedSize(865); @@ -648,12 +675,33 @@ public class CensusModulesTest { assertSame(spyServerSpan, ContextUtils.CONTEXT_SPAN_KEY.get(filteredContext)); verify(spyServerSpan, never()).end(any(EndSpanOptions.class)); + + serverStreamTracer.outboundMessage(0); + serverStreamTracer.outboundMessageSent(0, 882, -1); + serverStreamTracer.inboundMessage(0); + serverStreamTracer.outboundMessage(1); + serverStreamTracer.outboundMessageSent(1, -1, 27); + serverStreamTracer.inboundMessageRead(0, 255, 90); + serverStreamTracer.streamClosed(Status.CANCELLED); - verify(spyServerSpan).end( + InOrder inOrder = inOrder(spyServerSpan); + inOrder.verify(spyServerSpan, times(3)).addNetworkEvent(networkEventCaptor.capture()); + List events = networkEventCaptor.getAllValues(); + assertEquals( + NetworkEvent.builder(Type.SENT, 0).setCompressedMessageSize(882).build(), events.get(0)); + assertEquals( + NetworkEvent.builder(Type.SENT, 1).setUncompressedMessageSize(27).build(), events.get(1)); + assertEquals( + NetworkEvent.builder(Type.RECV, 0) + .setCompressedMessageSize(255) + .setUncompressedMessageSize(90) + .build(), + events.get(2)); + inOrder.verify(spyServerSpan).end( EndSpanOptions.builder() .setStatus(io.opencensus.trace.Status.CANCELLED).build()); - verify(spyServerSpan, never()).end(); + verifyNoMoreInteractions(spyServerSpan); } @Test @@ -713,6 +761,7 @@ public class CensusModulesTest { } @Override + @SuppressWarnings("deprecation") public void addAttributes(Map attributes) {} @Override diff --git a/core/src/test/java/io/grpc/internal/MessageDeframerTest.java b/core/src/test/java/io/grpc/internal/MessageDeframerTest.java index f0eb14a662..4e6039a6f5 100644 --- a/core/src/test/java/io/grpc/internal/MessageDeframerTest.java +++ b/core/src/test/java/io/grpc/internal/MessageDeframerTest.java @@ -18,6 +18,7 @@ package io.grpc.internal; import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.atLeastOnce; @@ -77,7 +78,7 @@ public class MessageDeframerTest { assertEquals(Bytes.asList(new byte[] {3, 14}), bytes(producer.getValue().next())); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); - checkStats(1, 2, 2); + checkStats(2, 2); } @Test @@ -91,7 +92,7 @@ public class MessageDeframerTest { verify(listener, atLeastOnce()).bytesRead(anyInt()); assertEquals(Bytes.asList(new byte[] {14, 15}), bytes(streams.get(1).next())); verifyNoMoreInteractions(listener); - checkStats(2, 3, 3); + checkStats(1, 1, 2, 2); } @Test @@ -104,7 +105,7 @@ public class MessageDeframerTest { verify(listener).deframerClosed(false); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); - checkStats(1, 1, 1); + checkStats(1, 1); } @Test @@ -113,7 +114,7 @@ public class MessageDeframerTest { deframer.closeWhenComplete(); verify(listener).deframerClosed(false); verifyNoMoreInteractions(listener); - checkStats(0, 0, 0); + checkStats(); } @Test @@ -124,7 +125,7 @@ public class MessageDeframerTest { verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener).deframerClosed(true); verifyNoMoreInteractions(listener); - checkStats(0, 0, 0); + checkStats(); } @Test @@ -139,7 +140,7 @@ public class MessageDeframerTest { Bytes.asList(new byte[] {3, 14, 1, 5, 9, 2, 6}), bytes(producer.getValue().next())); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); - checkStats(1, 7, 7); + checkStats(7, 7); } @Test @@ -154,7 +155,7 @@ public class MessageDeframerTest { assertEquals(Bytes.asList(new byte[] {3}), bytes(producer.getValue().next())); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); - checkStats(1, 1, 1); + checkStats(1, 1); } @Test @@ -165,7 +166,7 @@ public class MessageDeframerTest { assertEquals(Bytes.asList(), bytes(producer.getValue().next())); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); - checkStats(1, 0, 0); + checkStats(0, 0); } @Test @@ -177,7 +178,7 @@ public class MessageDeframerTest { assertEquals(Bytes.asList(new byte[1000]), bytes(producer.getValue().next())); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); - checkStats(1, 1000, 1000); + checkStats(1000, 1000); } @Test @@ -192,7 +193,7 @@ public class MessageDeframerTest { verify(listener).deframerClosed(false); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); - checkStats(1, 1, 1); + checkStats(1, 1); } @Test @@ -209,7 +210,7 @@ public class MessageDeframerTest { assertEquals(Bytes.asList(new byte[1000]), bytes(producer.getValue().next())); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); - checkStats(1, payload.length, 1000); + checkStats(payload.length, 1000); } @Test @@ -245,8 +246,7 @@ public class MessageDeframerTest { while (stream.read() != -1) {} stream.close(); - // SizeEnforcingInputStream only reports uncompressed bytes - checkStats(0, 0, 3); + checkSizeEnforcingInputStreamStats(3); } @Test @@ -258,8 +258,7 @@ public class MessageDeframerTest { while (stream.read() != -1) {} stream.close(); - // SizeEnforcingInputStream only reports uncompressed bytes - checkStats(0, 0, 3); + checkSizeEnforcingInputStreamStats(3); } @Test @@ -289,8 +288,7 @@ public class MessageDeframerTest { assertEquals(3, read); stream.close(); - // SizeEnforcingInputStream only reports uncompressed bytes - checkStats(0, 0, 3); + checkSizeEnforcingInputStreamStats(3); } @Test @@ -304,8 +302,7 @@ public class MessageDeframerTest { assertEquals(3, read); stream.close(); - // SizeEnforcingInputStream only reports uncompressed bytes - checkStats(0, 0, 3); + checkSizeEnforcingInputStreamStats(3); } @Test @@ -336,8 +333,7 @@ public class MessageDeframerTest { assertEquals(3, skipped); stream.close(); - // SizeEnforcingInputStream only reports uncompressed bytes - checkStats(0, 0, 3); + checkSizeEnforcingInputStreamStats(3); } @Test @@ -350,8 +346,7 @@ public class MessageDeframerTest { assertEquals(3, skipped); stream.close(); - // SizeEnforcingInputStream only reports uncompressed bytes - checkStats(0, 0, 3); + checkSizeEnforcingInputStreamStats(3); } @Test @@ -384,15 +379,38 @@ public class MessageDeframerTest { assertEquals(2, skipped); stream.close(); - // SizeEnforcingInputStream only reports uncompressed bytes - checkStats(0, 0, 3); + checkSizeEnforcingInputStreamStats(3); } - private void checkStats( - int messagesReceived, long wireBytesReceived, long uncompressedBytesReceived) { - assertEquals(messagesReceived, tracer.getInboundMessageCount()); - assertEquals(wireBytesReceived, tracer.getInboundWireSize()); - assertEquals(uncompressedBytesReceived, tracer.getInboundUncompressedSize()); + /** + * @param sizes in the format {wire0, uncompressed0, wire1, uncompressed1, ...} + */ + private void checkStats(long... sizes) { + assertEquals(0, sizes.length % 2); + int count = sizes.length / 2; + long expectedWireSize = 0; + long expectedUncompressedSize = 0; + for (int i = 0; i < count; i++) { + assertEquals("inboundMessage(" + i + ")", tracer.nextInboundEvent()); + assertEquals("inboundMessage()", tracer.nextInboundEvent()); + assertEquals( + String.format("inboundMessageRead(%d, %d, -1)", i, sizes[i * 2]), + tracer.nextInboundEvent()); + expectedWireSize += sizes[i * 2]; + expectedUncompressedSize += sizes[i * 2 + 1]; + } + assertNull(tracer.nextInboundEvent()); + assertNull(tracer.nextOutboundEvent()); + assertEquals(expectedWireSize, tracer.getInboundWireSize()); + assertEquals(expectedUncompressedSize, tracer.getInboundUncompressedSize()); + } + + private void checkSizeEnforcingInputStreamStats(long uncompressedSize) { + assertNull(tracer.nextInboundEvent()); + assertNull(tracer.nextOutboundEvent()); + assertEquals(0, tracer.getInboundWireSize()); + // SizeEnforcingInputStream only reports uncompressed bytes + assertEquals(uncompressedSize, tracer.getInboundUncompressedSize()); } private static List bytes(InputStream in) { diff --git a/core/src/test/java/io/grpc/internal/MessageFramerTest.java b/core/src/test/java/io/grpc/internal/MessageFramerTest.java index 68ed71ff6f..97477bc920 100644 --- a/core/src/test/java/io/grpc/internal/MessageFramerTest.java +++ b/core/src/test/java/io/grpc/internal/MessageFramerTest.java @@ -17,6 +17,7 @@ package io.grpc.internal; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.eq; @@ -82,7 +83,7 @@ public class MessageFramerTest { verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false, true); assertEquals(1, allocator.allocCount); verifyNoMoreInteractions(sink); - checkStats(1, 2, 2); + checkStats(2, 2); } @Test @@ -94,7 +95,7 @@ public class MessageFramerTest { verify(sink).deliverFrame(toWriteBuffer(new byte[] {3, 14}), false, true); assertEquals(2, allocator.allocCount); verifyNoMoreInteractions(sink); - checkStats(1, 2, 2); + checkStats(2, 2); } @Test @@ -108,7 +109,7 @@ public class MessageFramerTest { toWriteBuffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 1, 14}), false, true); verifyNoMoreInteractions(sink); assertEquals(1, allocator.allocCount); - checkStats(2, 2, 2); + checkStats(1, 1, 1, 1); } @Test @@ -120,7 +121,7 @@ public class MessageFramerTest { toWriteBuffer(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9, 2, 6}), true, true); verifyNoMoreInteractions(sink); assertEquals(1, allocator.allocCount); - checkStats(1, 7, 7); + checkStats(7, 7); } @Test @@ -129,7 +130,7 @@ public class MessageFramerTest { verify(sink).deliverFrame(null, true, true); verifyNoMoreInteractions(sink); assertEquals(0, allocator.allocCount); - checkStats(0, 0, 0); + checkStats(); } @Test @@ -145,7 +146,7 @@ public class MessageFramerTest { verify(sink).deliverFrame(toWriteBuffer(new byte[] {5}), false, true); verifyNoMoreInteractions(sink); assertEquals(2, allocator.allocCount); - checkStats(1, 8, 8); + checkStats(8, 8); } @Test @@ -162,7 +163,7 @@ public class MessageFramerTest { verify(sink).deliverFrame(toWriteBufferWithMinSize(new byte[] {1, 3}, 12), false, true); verifyNoMoreInteractions(sink); assertEquals(2, allocator.allocCount); - checkStats(2, 4, 4); + checkStats(3, 3, 1, 1); } @Test @@ -171,7 +172,7 @@ public class MessageFramerTest { framer.flush(); verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false, true); assertEquals(1, allocator.allocCount); - checkStats(1, 0, 0); + checkStats(0, 0); } @Test @@ -182,7 +183,7 @@ public class MessageFramerTest { verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false, true); // One alloc for the header assertEquals(1, allocator.allocCount); - checkStats(1, 0, 0); + checkStats(0, 0); } @Test @@ -193,7 +194,7 @@ public class MessageFramerTest { verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false, true); verifyNoMoreInteractions(sink); assertEquals(1, allocator.allocCount); - checkStats(1, 2, 2); + checkStats(2, 2); } @Test @@ -213,7 +214,7 @@ public class MessageFramerTest { assertEquals(toWriteBuffer(data), buffer); verifyNoMoreInteractions(sink); assertEquals(1, allocator.allocCount); - checkStats(1, 1000, 1000); + checkStats(1000, 1000); } @Test @@ -240,7 +241,7 @@ public class MessageFramerTest { verifyNoMoreInteractions(sink); assertEquals(3, allocator.allocCount); - checkStats(1, 1000, 1000); + checkStats(1000, 1000); } @Test @@ -265,7 +266,7 @@ public class MessageFramerTest { assertTrue(length < 1000); assertEquals(frameCaptor.getAllValues().get(1).size(), length); - checkStats(1, length, 1000); + checkStats(length, 1000); } @Test @@ -290,7 +291,7 @@ public class MessageFramerTest { assertEquals(1000, length); assertEquals(buffer.data.length - 5 , length); - checkStats(1, 1000, 1000); + checkStats(1000, 1000); } @Test @@ -316,7 +317,7 @@ public class MessageFramerTest { assertEquals(1000, length); assertEquals(buffer.data.length - 5 , length); - checkStats(1, 1000, 1000); + checkStats(1000, 1000); } @Test @@ -345,7 +346,7 @@ public class MessageFramerTest { writeKnownLength(framer, new byte[]{}); framer.flush(); verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false, true); - checkStats(1, 0, 0); + checkStats(0, 0); } private static WritableBuffer toWriteBuffer(byte[] data) { @@ -367,13 +368,27 @@ public class MessageFramerTest { // TODO(carl-mastrangelo): add framer.flush() here. } - private void checkStats(int messagesSent, long wireBytesSent, long uncompressedBytesSent) { - long actualWireSize = 0; - long actualUncompressedSize = 0; - - assertEquals(messagesSent, tracer.getOutboundMessageCount()); - assertEquals(uncompressedBytesSent, tracer.getOutboundUncompressedSize()); - assertEquals(wireBytesSent, tracer.getOutboundWireSize()); + /** + * @param sizes in the format {wire0, uncompressed0, wire1, uncompressed1, ...} + */ + private void checkStats(long... sizes) { + assertEquals(0, sizes.length % 2); + int count = sizes.length / 2; + long expectedWireSize = 0; + long expectedUncompressedSize = 0; + for (int i = 0; i < count; i++) { + assertEquals("outboundMessage(" + i + ")", tracer.nextOutboundEvent()); + assertEquals("outboundMessage()", tracer.nextOutboundEvent()); + assertEquals( + String.format("outboundMessageSent(%d, %d, %d)", i, sizes[i * 2], sizes[i * 2 + 1]), + tracer.nextOutboundEvent()); + expectedWireSize += sizes[i * 2]; + expectedUncompressedSize += sizes[i * 2 + 1]; + } + assertNull(tracer.nextOutboundEvent()); + assertNull(tracer.nextInboundEvent()); + assertEquals(expectedWireSize, tracer.getOutboundWireSize()); + assertEquals(expectedUncompressedSize, tracer.getOutboundUncompressedSize()); } static class ByteWritableBuffer implements WritableBuffer { diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbClientLoadRecorder.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbClientLoadRecorder.java index 039c24e19b..3115fa388c 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbClientLoadRecorder.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbClientLoadRecorder.java @@ -117,7 +117,7 @@ final class GrpclbClientLoadRecorder extends ClientStreamTracer.Factory { } @Override - public void inboundMessage() { + public void inboundMessage(int seqNo) { anythingReceived.set(true); } diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index e18b341daa..83c7f43a07 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -1789,18 +1789,29 @@ public abstract class AbstractInteropTest { TestStreamTracer tracer, Collection sentMessages, Collection receivedMessages) { - assertEquals(sentMessages.size(), tracer.getOutboundMessageCount()); - assertEquals(receivedMessages.size(), tracer.getInboundMessageCount()); - long uncompressedSentSize = 0; + int seqNo = 0; for (MessageLite msg : sentMessages) { + assertThat(tracer.nextOutboundEvent()).isEqualTo(String.format("outboundMessage(%d)", seqNo)); + assertThat(tracer.nextOutboundEvent()).isEqualTo("outboundMessage()"); + assertThat(tracer.nextOutboundEvent()).matches( + String.format( + "outboundMessageSent\\(%d, -?[0-9]+, %d\\)", seqNo, msg.getSerializedSize())); + seqNo++; uncompressedSentSize += msg.getSerializedSize(); } + assertNull(tracer.nextOutboundEvent()); long uncompressedReceivedSize = 0; + seqNo = 0; for (MessageLite msg : receivedMessages) { + assertThat(tracer.nextInboundEvent()).isEqualTo(String.format("inboundMessage(%d)", seqNo)); + assertThat(tracer.nextInboundEvent()).isEqualTo("inboundMessage()"); + assertThat(tracer.nextInboundEvent()).matches( + String.format("inboundMessageRead\\(%d, -?[0-9]+, -?[0-9]+\\)", seqNo)); uncompressedReceivedSize += msg.getSerializedSize(); + seqNo++; } - + assertNull(tracer.nextInboundEvent()); assertEquals(uncompressedSentSize, tracer.getOutboundUncompressedSize()); assertEquals(uncompressedReceivedSize, tracer.getInboundUncompressedSize()); } diff --git a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java index 27cb94ffd9..b96b155995 100644 --- a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java +++ b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java @@ -732,7 +732,8 @@ public abstract class AbstractTransportTest { assertTrue(clientStream.isReady()); clientStream.writeMessage(methodDescriptor.streamRequest("Hello!")); if (metricsExpected()) { - assertThat(clientStreamTracer1.getOutboundMessageCount()).isGreaterThan(0); + assertThat(clientStreamTracer1.nextOutboundEvent()).isEqualTo("outboundMessage(0)"); + assertThat(clientStreamTracer1.nextOutboundEvent()).isEqualTo("outboundMessage()"); } clientStream.flush(); @@ -740,10 +741,12 @@ public abstract class AbstractTransportTest { assertEquals("Hello!", methodDescriptor.parseRequest(message)); message.close(); if (metricsExpected()) { - assertThat(clientStreamTracer1.getOutboundMessageCount()).isGreaterThan(0); + assertThat(clientStreamTracer1.nextOutboundEvent()) + .matches("outboundMessageSent\\(0, -?[0-9]+, -?[0-9]+\\)"); assertThat(clientStreamTracer1.getOutboundWireSize()).isGreaterThan(0L); assertThat(clientStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L); - assertEquals(1, serverStreamTracer1.getInboundMessageCount()); + assertThat(serverStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage(0)"); + assertThat(serverStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage()"); } assertNull("no additional message expected", serverStreamMessageQueue.poll()); @@ -753,6 +756,8 @@ public abstract class AbstractTransportTest { if (metricsExpected()) { assertThat(serverStreamTracer1.getInboundWireSize()).isGreaterThan(0L); assertThat(serverStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L); + assertThat(serverStreamTracer1.nextInboundEvent()) + .matches("inboundMessageRead\\(0, -?[0-9]+, -?[0-9]+\\)"); } Metadata serverHeaders = new Metadata(); @@ -774,7 +779,8 @@ public abstract class AbstractTransportTest { assertTrue(serverStream.isReady()); serverStream.writeMessage(methodDescriptor.streamResponse("Hi. Who are you?")); if (metricsExpected()) { - assertEquals(1, serverStreamTracer1.getOutboundMessageCount()); + assertThat(serverStreamTracer1.nextOutboundEvent()).isEqualTo("outboundMessage(0)"); + assertThat(serverStreamTracer1.nextOutboundEvent()).isEqualTo("outboundMessage()"); } serverStream.flush(); @@ -782,13 +788,18 @@ public abstract class AbstractTransportTest { .messagesAvailable(any(StreamListener.MessageProducer.class)); message = clientStreamMessageQueue.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS); if (metricsExpected()) { + assertThat(serverStreamTracer1.nextOutboundEvent()) + .matches("outboundMessageSent\\(0, -?[0-9]+, -?[0-9]+\\)"); assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L); assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L); assertTrue(clientStreamTracer1.getInboundHeaders()); - assertThat(clientStreamTracer1.getInboundMessageCount()).isGreaterThan(0); + assertThat(clientStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage(0)"); + assertThat(clientStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage()"); } assertEquals("Hi. Who are you?", methodDescriptor.parseResponse(message)); if (metricsExpected()) { + assertThat(clientStreamTracer1.nextInboundEvent()) + .matches("inboundMessageRead\\(0, -?[0-9]+, -?[0-9]+\\)"); assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L); assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L); } @@ -804,6 +815,8 @@ public abstract class AbstractTransportTest { serverStream.close(status, trailers); if (metricsExpected()) { assertSame(status, serverStreamTracer1.getStatus()); + assertNull(serverStreamTracer1.nextInboundEvent()); + assertNull(serverStreamTracer1.nextOutboundEvent()); } verify(mockServerStreamListener, timeout(TIMEOUT_MS)).closed(statusCaptor.capture()); assertCodeEquals(Status.OK, statusCaptor.getValue()); @@ -811,6 +824,8 @@ public abstract class AbstractTransportTest { .closed(statusCaptor.capture(), metadataCaptor.capture()); if (metricsExpected()) { assertSame(statusCaptor.getValue(), clientStreamTracer1.getStatus()); + assertNull(clientStreamTracer1.nextInboundEvent()); + assertNull(clientStreamTracer1.nextOutboundEvent()); } assertEquals(status.getCode(), statusCaptor.getValue().getCode()); assertEquals(status.getDescription(), statusCaptor.getValue().getDescription()); @@ -1118,11 +1133,9 @@ public abstract class AbstractTransportTest { if (metricsExpected()) { assertTrue(clientStreamTracer1.getOutboundHeaders()); assertTrue(clientStreamTracer1.getInboundHeaders()); - assertEquals(1, clientStreamTracer1.getInboundMessageCount()); assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L); assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L); assertSame(status, clientStreamTracer1.getStatus()); - assertEquals(1, serverStreamTracer1.getOutboundMessageCount()); assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L); assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L); // There is a race between client cancelling and server closing. The final status seen by the diff --git a/testing/src/main/java/io/grpc/internal/testing/TestClientStreamTracer.java b/testing/src/main/java/io/grpc/internal/testing/TestClientStreamTracer.java index 7b57d41f6c..ee8c700801 100644 --- a/testing/src/main/java/io/grpc/internal/testing/TestClientStreamTracer.java +++ b/testing/src/main/java/io/grpc/internal/testing/TestClientStreamTracer.java @@ -53,11 +53,6 @@ public class TestClientStreamTracer extends ClientStreamTracer implements TestSt return outboundHeadersCalled.get(); } - @Override - public int getInboundMessageCount() { - return delegate.getInboundMessageCount(); - } - @Override public Status getStatus() { return delegate.getStatus(); @@ -73,11 +68,6 @@ public class TestClientStreamTracer extends ClientStreamTracer implements TestSt return delegate.getInboundUncompressedSize(); } - @Override - public int getOutboundMessageCount() { - return delegate.getOutboundMessageCount(); - } - @Override public long getOutboundWireSize() { return delegate.getOutboundWireSize(); @@ -88,6 +78,16 @@ public class TestClientStreamTracer extends ClientStreamTracer implements TestSt return delegate.getOutboundUncompressedSize(); } + @Override + public String nextOutboundEvent() { + return delegate.nextOutboundEvent(); + } + + @Override + public String nextInboundEvent() { + return delegate.nextInboundEvent(); + } + @Override public void outboundWireSize(long bytes) { delegate.outboundWireSize(bytes); @@ -114,15 +114,37 @@ public class TestClientStreamTracer extends ClientStreamTracer implements TestSt } @Override + @SuppressWarnings("deprecation") public void inboundMessage() { delegate.inboundMessage(); } @Override + public void inboundMessage(int seqNo) { + delegate.inboundMessage(seqNo); + } + + @Override + @SuppressWarnings("deprecation") public void outboundMessage() { delegate.outboundMessage(); } + @Override + public void outboundMessage(int seqNo) { + delegate.outboundMessage(seqNo); + } + + @Override + public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) { + delegate.outboundMessageSent(seqNo, optionalWireSize, optionalUncompressedSize); + } + + @Override + public void inboundMessageRead(int seqNo, long optionalWireSize, long optionalUncompressedSize) { + delegate.inboundMessageRead(seqNo, optionalWireSize, optionalUncompressedSize); + } + @Override public void outboundHeaders() { if (!outboundHeadersCalled.compareAndSet(false, true) diff --git a/testing/src/main/java/io/grpc/internal/testing/TestServerStreamTracer.java b/testing/src/main/java/io/grpc/internal/testing/TestServerStreamTracer.java index 83a2663e6f..9da476b823 100644 --- a/testing/src/main/java/io/grpc/internal/testing/TestServerStreamTracer.java +++ b/testing/src/main/java/io/grpc/internal/testing/TestServerStreamTracer.java @@ -47,11 +47,6 @@ public class TestServerStreamTracer extends ServerStreamTracer implements TestSt return serverCall.get(); } - @Override - public int getInboundMessageCount() { - return delegate.getInboundMessageCount(); - } - @Override public Status getStatus() { return delegate.getStatus(); @@ -67,11 +62,6 @@ public class TestServerStreamTracer extends ServerStreamTracer implements TestSt return delegate.getInboundUncompressedSize(); } - @Override - public int getOutboundMessageCount() { - return delegate.getOutboundMessageCount(); - } - @Override public long getOutboundWireSize() { return delegate.getOutboundWireSize(); @@ -82,6 +72,16 @@ public class TestServerStreamTracer extends ServerStreamTracer implements TestSt return delegate.getOutboundUncompressedSize(); } + @Override + public String nextOutboundEvent() { + return delegate.nextOutboundEvent(); + } + + @Override + public String nextInboundEvent() { + return delegate.nextInboundEvent(); + } + @Override public void outboundWireSize(long bytes) { delegate.outboundWireSize(bytes); @@ -108,15 +108,37 @@ public class TestServerStreamTracer extends ServerStreamTracer implements TestSt } @Override + @SuppressWarnings("deprecation") public void inboundMessage() { delegate.inboundMessage(); } @Override + public void inboundMessage(int seqNo) { + delegate.inboundMessage(seqNo); + } + + @Override + @SuppressWarnings("deprecation") public void outboundMessage() { delegate.outboundMessage(); } + @Override + public void outboundMessage(int seqNo) { + delegate.outboundMessage(seqNo); + } + + @Override + public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) { + delegate.outboundMessageSent(seqNo, optionalWireSize, optionalUncompressedSize); + } + + @Override + public void inboundMessageRead(int seqNo, long optionalWireSize, long optionalUncompressedSize) { + delegate.inboundMessageRead(seqNo, optionalWireSize, optionalUncompressedSize); + } + @Override public void serverCallStarted(ServerCall call) { if (!serverCall.compareAndSet(null, call) && delegate.failDuplicateCallbacks.get()) { diff --git a/testing/src/main/java/io/grpc/internal/testing/TestStreamTracer.java b/testing/src/main/java/io/grpc/internal/testing/TestStreamTracer.java index 1a98a0f031..216c5b2a69 100644 --- a/testing/src/main/java/io/grpc/internal/testing/TestStreamTracer.java +++ b/testing/src/main/java/io/grpc/internal/testing/TestStreamTracer.java @@ -19,11 +19,12 @@ package io.grpc.internal.testing; import io.grpc.Status; import io.grpc.StreamTracer; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; /** * A {@link StreamTracer} suitable for testing. @@ -40,16 +41,6 @@ public interface TestStreamTracer { */ boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException; - /** - * Returns how many times {@link StreamTracer#inboundMessage} has been called. - */ - int getInboundMessageCount(); - - /** - * Returns how many times {@link StreamTracer#outboundMessage} has been called. - */ - int getOutboundMessageCount(); - /** * Returns the status passed to {@link StreamTracer#streamClosed}. */ @@ -75,6 +66,17 @@ public interface TestStreamTracer { */ long getOutboundUncompressedSize(); + /** + * Returns the next captured outbound message event. + */ + @Nullable + String nextOutboundEvent(); + + /** + * Returns the next captured outbound message event. + */ + String nextInboundEvent(); + /** * A {@link StreamTracer} suitable for testing. */ @@ -84,8 +86,8 @@ public interface TestStreamTracer { protected final AtomicLong inboundWireSize = new AtomicLong(); protected final AtomicLong outboundUncompressedSize = new AtomicLong(); protected final AtomicLong inboundUncompressedSize = new AtomicLong(); - protected final AtomicInteger inboundMessageCount = new AtomicInteger(); - protected final AtomicInteger outboundMessageCount = new AtomicInteger(); + protected final LinkedBlockingQueue outboundEvents = new LinkedBlockingQueue(); + protected final LinkedBlockingQueue inboundEvents = new LinkedBlockingQueue(); protected final AtomicReference streamClosedStatus = new AtomicReference(); protected final CountDownLatch streamClosed = new CountDownLatch(1); protected final AtomicBoolean failDuplicateCallbacks = new AtomicBoolean(true); @@ -100,16 +102,6 @@ public interface TestStreamTracer { return streamClosed.await(timeout, timeUnit); } - @Override - public int getInboundMessageCount() { - return inboundMessageCount.get(); - } - - @Override - public int getOutboundMessageCount() { - return outboundMessageCount.get(); - } - @Override public Status getStatus() { return streamClosedStatus.get(); @@ -167,13 +159,52 @@ public interface TestStreamTracer { } @Override + @SuppressWarnings("deprecation") public void inboundMessage() { - inboundMessageCount.incrementAndGet(); + inboundEvents.add("inboundMessage()"); } @Override + public void inboundMessage(int seqNo) { + inboundEvents.add("inboundMessage(" + seqNo + ")"); + } + + @Override + @SuppressWarnings("deprecation") public void outboundMessage() { - outboundMessageCount.incrementAndGet(); + outboundEvents.add("outboundMessage()"); + } + + @Override + public void outboundMessage(int seqNo) { + outboundEvents.add("outboundMessage(" + seqNo + ")"); + } + + @Override + public void outboundMessageSent( + int seqNo, long optionalWireSize, long optionalUncompressedSize) { + outboundEvents.add( + String.format( + "outboundMessageSent(%d, %d, %d)", + seqNo, optionalWireSize, optionalUncompressedSize)); + } + + @Override + public void inboundMessageRead( + int seqNo, long optionalWireSize, long optionalUncompressedSize) { + inboundEvents.add( + String.format( + "inboundMessageRead(%d, %d, %d)", seqNo, optionalWireSize, optionalUncompressedSize)); + } + + @Override + public String nextOutboundEvent() { + return outboundEvents.poll(); + } + + @Override + public String nextInboundEvent() { + return inboundEvents.poll(); } } }