report uncompressed message size when it does not need compression (#11598)

This commit is contained in:
yifeizhuang 2024-10-07 10:44:27 -07:00 committed by GitHub
parent 1ded8aff81
commit 2aae68e117
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 21 additions and 16 deletions

View File

@ -406,7 +406,8 @@ public class MessageDeframer implements Closeable, Deframer {
// There is no reliable way to get the uncompressed size per message when it's compressed, // There is no reliable way to get the uncompressed size per message when it's compressed,
// because the uncompressed bytes are provided through an InputStream whose total size is // because the uncompressed bytes are provided through an InputStream whose total size is
// unknown until all bytes are read, and we don't know when it happens. // unknown until all bytes are read, and we don't know when it happens.
statsTraceCtx.inboundMessageRead(currentMessageSeqNo, inboundBodyWireSize, -1); statsTraceCtx.inboundMessageRead(currentMessageSeqNo, inboundBodyWireSize,
(compressedFlag || fullStreamDecompressor != null) ? -1 : inboundBodyWireSize);
inboundBodyWireSize = 0; inboundBodyWireSize = 0;
InputStream stream = compressedFlag ? getCompressedBody() : getUncompressedBody(); InputStream stream = compressedFlag ? getCompressedBody() : getUncompressedBody();
nextFrame.touch(); nextFrame.touch();

View File

@ -133,7 +133,7 @@ public class MessageDeframerTest {
assertEquals(Bytes.asList(new byte[]{3, 14}), bytes(producer.getValue().next())); assertEquals(Bytes.asList(new byte[]{3, 14}), bytes(producer.getValue().next()));
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats(), fakeClock, 2, 2); checkStats(tracer, transportTracer.getStats(), fakeClock, useGzipInflatingBuffer, 2, 2);
} }
@Test @Test
@ -148,7 +148,7 @@ public class MessageDeframerTest {
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
assertEquals(Bytes.asList(new byte[]{14, 15}), bytes(streams.get(1).next())); assertEquals(Bytes.asList(new byte[]{14, 15}), bytes(streams.get(1).next()));
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats(), fakeClock, 1, 1, 2, 2); checkStats(tracer, transportTracer.getStats(), fakeClock, useGzipInflatingBuffer, 1, 1, 2, 2);
} }
@Test @Test
@ -162,7 +162,7 @@ public class MessageDeframerTest {
verify(listener).deframerClosed(false); verify(listener).deframerClosed(false);
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats(), fakeClock, 1, 1); checkStats(tracer, transportTracer.getStats(), fakeClock, useGzipInflatingBuffer, 1, 1);
} }
@Test @Test
@ -177,7 +177,7 @@ public class MessageDeframerTest {
} }
verify(listener).deframerClosed(false); verify(listener).deframerClosed(false);
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats(), fakeClock); checkStats(tracer, transportTracer.getStats(), fakeClock, false);
} }
@Test @Test
@ -189,7 +189,7 @@ public class MessageDeframerTest {
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
verify(listener).deframerClosed(true); verify(listener).deframerClosed(true);
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats(), fakeClock); checkStats(tracer, transportTracer.getStats(), fakeClock, false);
} }
@Test @Test
@ -206,7 +206,7 @@ public class MessageDeframerTest {
deframer.closeWhenComplete(); deframer.closeWhenComplete();
verify(listener).deframerClosed(true); verify(listener).deframerClosed(true);
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats(), fakeClock); checkStats(tracer, transportTracer.getStats(), fakeClock, false);
} }
@Test @Test
@ -228,10 +228,11 @@ public class MessageDeframerTest {
tracer, tracer,
transportTracer.getStats(), transportTracer.getStats(),
fakeClock, fakeClock,
true,
7 /* msg size */ + 2 /* second buffer adds two bytes of overhead in deflate block */, 7 /* msg size */ + 2 /* second buffer adds two bytes of overhead in deflate block */,
7); 7);
} else { } else {
checkStats(tracer, transportTracer.getStats(), fakeClock, 7, 7); checkStats(tracer, transportTracer.getStats(), fakeClock, false, 7, 7);
} }
} }
@ -248,7 +249,7 @@ public class MessageDeframerTest {
assertEquals(Bytes.asList(new byte[]{3}), bytes(producer.getValue().next())); assertEquals(Bytes.asList(new byte[]{3}), bytes(producer.getValue().next()));
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats(), fakeClock, 1, 1); checkStats(tracer, transportTracer.getStats(), fakeClock, useGzipInflatingBuffer, 1, 1);
} }
@Test @Test
@ -259,7 +260,7 @@ public class MessageDeframerTest {
assertEquals(Bytes.asList(), bytes(producer.getValue().next())); assertEquals(Bytes.asList(), bytes(producer.getValue().next()));
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats(), fakeClock, 0, 0); checkStats(tracer, transportTracer.getStats(), fakeClock, useGzipInflatingBuffer, 0, 0);
} }
@Test @Test
@ -273,9 +274,10 @@ public class MessageDeframerTest {
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
if (useGzipInflatingBuffer) { if (useGzipInflatingBuffer) {
checkStats(tracer, transportTracer.getStats(), fakeClock, 8 /* compressed size */, 1000); checkStats(tracer, transportTracer.getStats(), fakeClock,true,
8 /* compressed size */, 1000);
} else { } else {
checkStats(tracer, transportTracer.getStats(), fakeClock, 1000, 1000); checkStats(tracer, transportTracer.getStats(), fakeClock, false, 1000, 1000);
} }
} }
@ -292,7 +294,7 @@ public class MessageDeframerTest {
verify(listener).deframerClosed(false); verify(listener).deframerClosed(false);
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats(), fakeClock, 1, 1); checkStats(tracer, transportTracer.getStats(), fakeClock, useGzipInflatingBuffer, 1, 1);
} }
@Test @Test
@ -308,6 +310,7 @@ public class MessageDeframerTest {
verify(listener).messagesAvailable(producer.capture()); verify(listener).messagesAvailable(producer.capture());
assertEquals(Bytes.asList(new byte[1000]), bytes(producer.getValue().next())); assertEquals(Bytes.asList(new byte[1000]), bytes(producer.getValue().next()));
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
checkStats(tracer, transportTracer.getStats(), fakeClock, true, 29, 1000);
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
} }
@ -502,7 +505,8 @@ public class MessageDeframerTest {
* @param sizes in the format {wire0, uncompressed0, wire1, uncompressed1, ...} * @param sizes in the format {wire0, uncompressed0, wire1, uncompressed1, ...}
*/ */
private static void checkStats( private static void checkStats(
TestBaseStreamTracer tracer, TransportStats transportStats, FakeClock clock, long... sizes) { TestBaseStreamTracer tracer, TransportStats transportStats, FakeClock clock,
boolean compressed, long... sizes) {
assertEquals(0, sizes.length % 2); assertEquals(0, sizes.length % 2);
int count = sizes.length / 2; int count = sizes.length / 2;
long expectedWireSize = 0; long expectedWireSize = 0;
@ -510,7 +514,8 @@ public class MessageDeframerTest {
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
assertEquals("inboundMessage(" + i + ")", tracer.nextInboundEvent()); assertEquals("inboundMessage(" + i + ")", tracer.nextInboundEvent());
assertEquals( assertEquals(
String.format(Locale.US, "inboundMessageRead(%d, %d, -1)", i, sizes[i * 2]), String.format(Locale.US, "inboundMessageRead(%d, %d, %d)", i, sizes[i * 2],
compressed ? -1 : sizes[i * 2 + 1]),
tracer.nextInboundEvent()); tracer.nextInboundEvent());
expectedWireSize += sizes[i * 2]; expectedWireSize += sizes[i * 2];
expectedUncompressedSize += sizes[i * 2 + 1]; expectedUncompressedSize += sizes[i * 2 + 1];

View File

@ -217,7 +217,6 @@ final class OpenTelemetryTracingModule {
@Override @Override
public void inboundMessageRead( public void inboundMessageRead(
int seqNo, long optionalWireSize, long optionalUncompressedSize) { int seqNo, long optionalWireSize, long optionalUncompressedSize) {
//TODO(yifeizhuang): needs support from message deframer.
if (optionalWireSize != optionalUncompressedSize) { if (optionalWireSize != optionalUncompressedSize) {
recordInboundCompressedMessage(span, seqNo, optionalWireSize); recordInboundCompressedMessage(span, seqNo, optionalWireSize);
} }