diff --git a/core/src/test/java/io/grpc/internal/MessageDeframerTest.java b/core/src/test/java/io/grpc/internal/MessageDeframerTest.java index 8b6fc1da94..1eb72857c5 100644 --- a/core/src/test/java/io/grpc/internal/MessageDeframerTest.java +++ b/core/src/test/java/io/grpc/internal/MessageDeframerTest.java @@ -85,13 +85,16 @@ public class MessageDeframerTest { }); } + private final FakeClock fakeClock = new FakeClock(); + @Parameter // Automatically set by test runner, must be public public boolean useGzipInflatingBuffer; private Listener listener = mock(Listener.class); private TestBaseStreamTracer tracer = new TestBaseStreamTracer(); private StatsTraceContext statsTraceCtx = new StatsTraceContext(new StreamTracer[]{tracer}); - private TransportTracer transportTracer = new TransportTracer(); + private TransportTracer transportTracer = + new TransportTracer.Factory(fakeClock.getTimeProvider()).create(); private MessageDeframer deframer = new MessageDeframer(listener, Codec.Identity.NONE, DEFAULT_MAX_MESSAGE_SIZE, statsTraceCtx, transportTracer); @@ -123,17 +126,19 @@ public class MessageDeframerTest { @Test public void simplePayload() { deframer.request(1); + fakeClock.forwardTime(10, TimeUnit.MILLISECONDS); deframer.deframe(buffer(new byte[]{0, 0, 0, 0, 2, 3, 14})); verify(listener).messagesAvailable(producer.capture()); assertEquals(Bytes.asList(new byte[]{3, 14}), bytes(producer.getValue().next())); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); - checkStats(tracer, transportTracer.getStats(), 2, 2); + checkStats(tracer, transportTracer.getStats(), fakeClock, 2, 2); } @Test public void smallCombinedPayloads() { deframer.request(2); + fakeClock.forwardTime(10, TimeUnit.MILLISECONDS); deframer.deframe(buffer(new byte[]{0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15})); verify(listener, times(2)).messagesAvailable(producer.capture()); List streams = producer.getAllValues(); @@ -142,12 +147,13 @@ public class MessageDeframerTest { verify(listener, atLeastOnce()).bytesRead(anyInt()); assertEquals(Bytes.asList(new byte[]{14, 15}), bytes(streams.get(1).next())); verifyNoMoreInteractions(listener); - checkStats(tracer, transportTracer.getStats(), 1, 1, 2, 2); + checkStats(tracer, transportTracer.getStats(), fakeClock, 1, 1, 2, 2); } @Test public void endOfStreamWithPayloadShouldNotifyEndOfStream() { deframer.request(1); + fakeClock.forwardTime(10, TimeUnit.MILLISECONDS); deframer.deframe(buffer(new byte[]{0, 0, 0, 0, 1, 3})); deframer.closeWhenComplete(); verify(listener).messagesAvailable(producer.capture()); @@ -155,11 +161,12 @@ public class MessageDeframerTest { verify(listener).deframerClosed(false); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); - checkStats(tracer, transportTracer.getStats(), 1, 1); + checkStats(tracer, transportTracer.getStats(), fakeClock, 1, 1); } @Test public void endOfStreamShouldNotifyEndOfStream() { + fakeClock.forwardTime(10, TimeUnit.MILLISECONDS); deframer.deframe(buffer(new byte[0])); deframer.closeWhenComplete(); deframer.request(1); @@ -169,18 +176,19 @@ public class MessageDeframerTest { } verify(listener).deframerClosed(false); verifyNoMoreInteractions(listener); - checkStats(tracer, transportTracer.getStats()); + checkStats(tracer, transportTracer.getStats(), fakeClock); } @Test public void endOfStreamWithPartialMessageShouldNotifyDeframerClosedWithPartialMessage() { deframer.request(1); + fakeClock.forwardTime(10, TimeUnit.MILLISECONDS); deframer.deframe(buffer(new byte[1])); deframer.closeWhenComplete(); verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener).deframerClosed(true); verifyNoMoreInteractions(listener); - checkStats(tracer, transportTracer.getStats()); + checkStats(tracer, transportTracer.getStats(), fakeClock); } @Test @@ -192,16 +200,18 @@ public class MessageDeframerTest { DEFAULT_MAX_MESSAGE_SIZE, statsTraceCtx, transportTracer); deframer.setFullStreamDecompressor(new GzipInflatingBuffer()); deframer.request(1); + fakeClock.forwardTime(10, TimeUnit.MILLISECONDS); deframer.deframe(buffer(new byte[1])); deframer.closeWhenComplete(); verify(listener).deframerClosed(true); verifyNoMoreInteractions(listener); - checkStats(tracer, transportTracer.getStats()); + checkStats(tracer, transportTracer.getStats(), fakeClock); } @Test public void payloadSplitBetweenBuffers() { deframer.request(1); + fakeClock.forwardTime(10, TimeUnit.MILLISECONDS); deframer.deframe(buffer(new byte[]{0, 0, 0, 0, 7, 3, 14, 1, 5, 9})); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); @@ -215,27 +225,29 @@ public class MessageDeframerTest { if (useGzipInflatingBuffer) { checkStats( tracer, - transportTracer.getStats(), + transportTracer.getStats(), + fakeClock, 7 /* msg size */ + 2 /* second buffer adds two bytes of overhead in deflate block */, 7); } else { - checkStats(tracer, transportTracer.getStats(), 7, 7); + checkStats(tracer, transportTracer.getStats(), fakeClock, 7, 7); } } @Test public void frameHeaderSplitBetweenBuffers() { deframer.request(1); - + fakeClock.forwardTime(10, TimeUnit.MILLISECONDS); deframer.deframe(buffer(new byte[]{0, 0})); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); + fakeClock.forwardTime(10, TimeUnit.MILLISECONDS); deframer.deframe(buffer(new byte[]{0, 0, 1, 3})); verify(listener).messagesAvailable(producer.capture()); assertEquals(Bytes.asList(new byte[]{3}), bytes(producer.getValue().next())); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); - checkStats(tracer, transportTracer.getStats(), 1, 1); + checkStats(tracer, transportTracer.getStats(), fakeClock, 1, 1); } @Test @@ -246,12 +258,13 @@ public class MessageDeframerTest { assertEquals(Bytes.asList(), bytes(producer.getValue().next())); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); - checkStats(tracer, transportTracer.getStats(), 0, 0); + checkStats(tracer, transportTracer.getStats(), fakeClock, 0, 0); } @Test public void largerFrameSize() { deframer.request(1); + fakeClock.forwardTime(10, TimeUnit.MILLISECONDS); deframer.deframe(ReadableBuffers.wrap( Bytes.concat(new byte[]{0, 0, 0, 3, (byte) 232}, new byte[1000]))); verify(listener).messagesAvailable(producer.capture()); @@ -259,14 +272,15 @@ public class MessageDeframerTest { verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); if (useGzipInflatingBuffer) { - checkStats(tracer, transportTracer.getStats(), 8 /* compressed size */, 1000); + checkStats(tracer, transportTracer.getStats(), fakeClock, 8 /* compressed size */, 1000); } else { - checkStats(tracer, transportTracer.getStats(), 1000, 1000); + checkStats(tracer, transportTracer.getStats(), fakeClock, 1000, 1000); } } @Test public void endOfStreamCallbackShouldWaitForMessageDelivery() { + fakeClock.forwardTime(10, TimeUnit.MILLISECONDS); deframer.deframe(buffer(new byte[]{0, 0, 0, 0, 1, 3})); deframer.closeWhenComplete(); verifyNoMoreInteractions(listener); @@ -277,7 +291,7 @@ public class MessageDeframerTest { verify(listener).deframerClosed(false); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); - checkStats(tracer, transportTracer.getStats(), 1, 1); + checkStats(tracer, transportTracer.getStats(), fakeClock, 1, 1); } @Test @@ -480,10 +494,11 @@ public class MessageDeframerTest { /** * @param transportStats the transport level stats counters + * @param clock the fakeClock to verify timestamp * @param sizes in the format {wire0, uncompressed0, wire1, uncompressed1, ...} */ private static void checkStats( - TestBaseStreamTracer tracer, TransportStats transportStats, long... sizes) { + TestBaseStreamTracer tracer, TransportStats transportStats, FakeClock clock, long... sizes) { assertEquals(0, sizes.length % 2); int count = sizes.length / 2; long expectedWireSize = 0; @@ -502,12 +517,11 @@ public class MessageDeframerTest { assertEquals(expectedUncompressedSize, tracer.getInboundUncompressedSize()); assertEquals(count, transportStats.messagesReceived); - long transportReceiveMsgMs = TimeUnit.NANOSECONDS.toMillis( - transportStats.lastMessageReceivedTimeNanos); if (count > 0) { - assertThat(System.currentTimeMillis() - transportReceiveMsgMs).isAtMost(50L); + assertThat(transportStats.lastMessageReceivedTimeNanos) + .isEqualTo(clock.getTimeProvider().currentTimeNanos()); } else { - assertEquals(0, transportReceiveMsgMs); + assertEquals(0L, transportStats.lastMessageReceivedTimeNanos); } }