core: use fakeClock in MessageDeframer tests to fix flaky test (#5055)

This commit is contained in:
Jihun Cho 2018-11-14 15:13:10 -08:00 committed by GitHub
parent ea9bdabcb2
commit ab5257504b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 34 additions and 20 deletions

View File

@ -85,13 +85,16 @@ public class MessageDeframerTest {
}); });
} }
private final FakeClock fakeClock = new FakeClock();
@Parameter // Automatically set by test runner, must be public @Parameter // Automatically set by test runner, must be public
public boolean useGzipInflatingBuffer; public boolean useGzipInflatingBuffer;
private Listener listener = mock(Listener.class); private Listener listener = mock(Listener.class);
private TestBaseStreamTracer tracer = new TestBaseStreamTracer(); private TestBaseStreamTracer tracer = new TestBaseStreamTracer();
private StatsTraceContext statsTraceCtx = new StatsTraceContext(new StreamTracer[]{tracer}); private StatsTraceContext statsTraceCtx = new StatsTraceContext(new StreamTracer[]{tracer});
private TransportTracer transportTracer = new TransportTracer(); private TransportTracer transportTracer =
new TransportTracer.Factory(fakeClock.getTimeProvider()).create();
private MessageDeframer deframer = new MessageDeframer(listener, Codec.Identity.NONE, private MessageDeframer deframer = new MessageDeframer(listener, Codec.Identity.NONE,
DEFAULT_MAX_MESSAGE_SIZE, statsTraceCtx, transportTracer); DEFAULT_MAX_MESSAGE_SIZE, statsTraceCtx, transportTracer);
@ -123,17 +126,19 @@ public class MessageDeframerTest {
@Test @Test
public void simplePayload() { public void simplePayload() {
deframer.request(1); deframer.request(1);
fakeClock.forwardTime(10, TimeUnit.MILLISECONDS);
deframer.deframe(buffer(new byte[]{0, 0, 0, 0, 2, 3, 14})); deframer.deframe(buffer(new byte[]{0, 0, 0, 0, 2, 3, 14}));
verify(listener).messagesAvailable(producer.capture()); verify(listener).messagesAvailable(producer.capture());
assertEquals(Bytes.asList(new byte[]{3, 14}), bytes(producer.getValue().next())); assertEquals(Bytes.asList(new byte[]{3, 14}), bytes(producer.getValue().next()));
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats(), 2, 2); checkStats(tracer, transportTracer.getStats(), fakeClock, 2, 2);
} }
@Test @Test
public void smallCombinedPayloads() { public void smallCombinedPayloads() {
deframer.request(2); deframer.request(2);
fakeClock.forwardTime(10, TimeUnit.MILLISECONDS);
deframer.deframe(buffer(new byte[]{0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15})); deframer.deframe(buffer(new byte[]{0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}));
verify(listener, times(2)).messagesAvailable(producer.capture()); verify(listener, times(2)).messagesAvailable(producer.capture());
List<StreamListener.MessageProducer> streams = producer.getAllValues(); List<StreamListener.MessageProducer> streams = producer.getAllValues();
@ -142,12 +147,13 @@ public class MessageDeframerTest {
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
assertEquals(Bytes.asList(new byte[]{14, 15}), bytes(streams.get(1).next())); assertEquals(Bytes.asList(new byte[]{14, 15}), bytes(streams.get(1).next()));
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats(), 1, 1, 2, 2); checkStats(tracer, transportTracer.getStats(), fakeClock, 1, 1, 2, 2);
} }
@Test @Test
public void endOfStreamWithPayloadShouldNotifyEndOfStream() { public void endOfStreamWithPayloadShouldNotifyEndOfStream() {
deframer.request(1); deframer.request(1);
fakeClock.forwardTime(10, TimeUnit.MILLISECONDS);
deframer.deframe(buffer(new byte[]{0, 0, 0, 0, 1, 3})); deframer.deframe(buffer(new byte[]{0, 0, 0, 0, 1, 3}));
deframer.closeWhenComplete(); deframer.closeWhenComplete();
verify(listener).messagesAvailable(producer.capture()); verify(listener).messagesAvailable(producer.capture());
@ -155,11 +161,12 @@ public class MessageDeframerTest {
verify(listener).deframerClosed(false); verify(listener).deframerClosed(false);
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats(), 1, 1); checkStats(tracer, transportTracer.getStats(), fakeClock, 1, 1);
} }
@Test @Test
public void endOfStreamShouldNotifyEndOfStream() { public void endOfStreamShouldNotifyEndOfStream() {
fakeClock.forwardTime(10, TimeUnit.MILLISECONDS);
deframer.deframe(buffer(new byte[0])); deframer.deframe(buffer(new byte[0]));
deframer.closeWhenComplete(); deframer.closeWhenComplete();
deframer.request(1); deframer.request(1);
@ -169,18 +176,19 @@ public class MessageDeframerTest {
} }
verify(listener).deframerClosed(false); verify(listener).deframerClosed(false);
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats()); checkStats(tracer, transportTracer.getStats(), fakeClock);
} }
@Test @Test
public void endOfStreamWithPartialMessageShouldNotifyDeframerClosedWithPartialMessage() { public void endOfStreamWithPartialMessageShouldNotifyDeframerClosedWithPartialMessage() {
deframer.request(1); deframer.request(1);
fakeClock.forwardTime(10, TimeUnit.MILLISECONDS);
deframer.deframe(buffer(new byte[1])); deframer.deframe(buffer(new byte[1]));
deframer.closeWhenComplete(); deframer.closeWhenComplete();
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
verify(listener).deframerClosed(true); verify(listener).deframerClosed(true);
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats()); checkStats(tracer, transportTracer.getStats(), fakeClock);
} }
@Test @Test
@ -192,16 +200,18 @@ public class MessageDeframerTest {
DEFAULT_MAX_MESSAGE_SIZE, statsTraceCtx, transportTracer); DEFAULT_MAX_MESSAGE_SIZE, statsTraceCtx, transportTracer);
deframer.setFullStreamDecompressor(new GzipInflatingBuffer()); deframer.setFullStreamDecompressor(new GzipInflatingBuffer());
deframer.request(1); deframer.request(1);
fakeClock.forwardTime(10, TimeUnit.MILLISECONDS);
deframer.deframe(buffer(new byte[1])); deframer.deframe(buffer(new byte[1]));
deframer.closeWhenComplete(); deframer.closeWhenComplete();
verify(listener).deframerClosed(true); verify(listener).deframerClosed(true);
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats()); checkStats(tracer, transportTracer.getStats(), fakeClock);
} }
@Test @Test
public void payloadSplitBetweenBuffers() { public void payloadSplitBetweenBuffers() {
deframer.request(1); deframer.request(1);
fakeClock.forwardTime(10, TimeUnit.MILLISECONDS);
deframer.deframe(buffer(new byte[]{0, 0, 0, 0, 7, 3, 14, 1, 5, 9})); deframer.deframe(buffer(new byte[]{0, 0, 0, 0, 7, 3, 14, 1, 5, 9}));
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
@ -215,27 +225,29 @@ public class MessageDeframerTest {
if (useGzipInflatingBuffer) { if (useGzipInflatingBuffer) {
checkStats( checkStats(
tracer, tracer,
transportTracer.getStats(), transportTracer.getStats(),
fakeClock,
7 /* msg size */ + 2 /* second buffer adds two bytes of overhead in deflate block */, 7 /* msg size */ + 2 /* second buffer adds two bytes of overhead in deflate block */,
7); 7);
} else { } else {
checkStats(tracer, transportTracer.getStats(), 7, 7); checkStats(tracer, transportTracer.getStats(), fakeClock, 7, 7);
} }
} }
@Test @Test
public void frameHeaderSplitBetweenBuffers() { public void frameHeaderSplitBetweenBuffers() {
deframer.request(1); deframer.request(1);
fakeClock.forwardTime(10, TimeUnit.MILLISECONDS);
deframer.deframe(buffer(new byte[]{0, 0})); deframer.deframe(buffer(new byte[]{0, 0}));
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
fakeClock.forwardTime(10, TimeUnit.MILLISECONDS);
deframer.deframe(buffer(new byte[]{0, 0, 1, 3})); deframer.deframe(buffer(new byte[]{0, 0, 1, 3}));
verify(listener).messagesAvailable(producer.capture()); verify(listener).messagesAvailable(producer.capture());
assertEquals(Bytes.asList(new byte[]{3}), bytes(producer.getValue().next())); assertEquals(Bytes.asList(new byte[]{3}), bytes(producer.getValue().next()));
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats(), 1, 1); checkStats(tracer, transportTracer.getStats(), fakeClock, 1, 1);
} }
@Test @Test
@ -246,12 +258,13 @@ public class MessageDeframerTest {
assertEquals(Bytes.asList(), bytes(producer.getValue().next())); assertEquals(Bytes.asList(), bytes(producer.getValue().next()));
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats(), 0, 0); checkStats(tracer, transportTracer.getStats(), fakeClock, 0, 0);
} }
@Test @Test
public void largerFrameSize() { public void largerFrameSize() {
deframer.request(1); deframer.request(1);
fakeClock.forwardTime(10, TimeUnit.MILLISECONDS);
deframer.deframe(ReadableBuffers.wrap( deframer.deframe(ReadableBuffers.wrap(
Bytes.concat(new byte[]{0, 0, 0, 3, (byte) 232}, new byte[1000]))); Bytes.concat(new byte[]{0, 0, 0, 3, (byte) 232}, new byte[1000])));
verify(listener).messagesAvailable(producer.capture()); verify(listener).messagesAvailable(producer.capture());
@ -259,14 +272,15 @@ public class MessageDeframerTest {
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
if (useGzipInflatingBuffer) { if (useGzipInflatingBuffer) {
checkStats(tracer, transportTracer.getStats(), 8 /* compressed size */, 1000); checkStats(tracer, transportTracer.getStats(), fakeClock, 8 /* compressed size */, 1000);
} else { } else {
checkStats(tracer, transportTracer.getStats(), 1000, 1000); checkStats(tracer, transportTracer.getStats(), fakeClock, 1000, 1000);
} }
} }
@Test @Test
public void endOfStreamCallbackShouldWaitForMessageDelivery() { public void endOfStreamCallbackShouldWaitForMessageDelivery() {
fakeClock.forwardTime(10, TimeUnit.MILLISECONDS);
deframer.deframe(buffer(new byte[]{0, 0, 0, 0, 1, 3})); deframer.deframe(buffer(new byte[]{0, 0, 0, 0, 1, 3}));
deframer.closeWhenComplete(); deframer.closeWhenComplete();
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
@ -277,7 +291,7 @@ public class MessageDeframerTest {
verify(listener).deframerClosed(false); verify(listener).deframerClosed(false);
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats(), 1, 1); checkStats(tracer, transportTracer.getStats(), fakeClock, 1, 1);
} }
@Test @Test
@ -480,10 +494,11 @@ public class MessageDeframerTest {
/** /**
* @param transportStats the transport level stats counters * @param transportStats the transport level stats counters
* @param clock the fakeClock to verify timestamp
* @param sizes in the format {wire0, uncompressed0, wire1, uncompressed1, ...} * @param sizes in the format {wire0, uncompressed0, wire1, uncompressed1, ...}
*/ */
private static void checkStats( private static void checkStats(
TestBaseStreamTracer tracer, TransportStats transportStats, long... sizes) { TestBaseStreamTracer tracer, TransportStats transportStats, FakeClock clock, long... sizes) {
assertEquals(0, sizes.length % 2); assertEquals(0, sizes.length % 2);
int count = sizes.length / 2; int count = sizes.length / 2;
long expectedWireSize = 0; long expectedWireSize = 0;
@ -502,12 +517,11 @@ public class MessageDeframerTest {
assertEquals(expectedUncompressedSize, tracer.getInboundUncompressedSize()); assertEquals(expectedUncompressedSize, tracer.getInboundUncompressedSize());
assertEquals(count, transportStats.messagesReceived); assertEquals(count, transportStats.messagesReceived);
long transportReceiveMsgMs = TimeUnit.NANOSECONDS.toMillis(
transportStats.lastMessageReceivedTimeNanos);
if (count > 0) { if (count > 0) {
assertThat(System.currentTimeMillis() - transportReceiveMsgMs).isAtMost(50L); assertThat(transportStats.lastMessageReceivedTimeNanos)
.isEqualTo(clock.getTimeProvider().currentTimeNanos());
} else { } else {
assertEquals(0, transportReceiveMsgMs); assertEquals(0L, transportStats.lastMessageReceivedTimeNanos);
} }
} }