From b65cbf508195fb86b3642b75185c2b150c700e31 Mon Sep 17 00:00:00 2001 From: MV Shiva Date: Thu, 24 Oct 2024 01:22:41 +0530 Subject: [PATCH] inprocess: Support tracing message sizes guarded by flag (#11629) --- .../grpc/internal/AbstractTransportTest.java | 45 +++++++--- .../inprocess/InProcessChannelBuilder.java | 4 + .../io/grpc/inprocess/InProcessTransport.java | 88 +++++++++++-------- .../inprocess/InProcessTransportTest.java | 10 ++- 4 files changed, 92 insertions(+), 55 deletions(-) diff --git a/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java b/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java index 75ea267870..69d8e65955 100644 --- a/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java +++ b/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java @@ -96,6 +96,9 @@ public abstract class AbstractTransportTest { private static final int TIMEOUT_MS = 5000; + protected static final String GRPC_EXPERIMENTAL_SUPPORT_TRACING_MESSAGE_SIZES = + "GRPC_EXPERIMENTAL_SUPPORT_TRACING_MESSAGE_SIZES"; + private static final Attributes.Key ADDITIONAL_TRANSPORT_ATTR_KEY = Attributes.Key.create("additional-attr"); @@ -238,6 +241,13 @@ public abstract class AbstractTransportTest { throw new UnsupportedOperationException(); } + /** + * Returns true if env var is set. + */ + protected static boolean isEnabledSupportTracingMessageSizes() { + return GrpcUtil.getFlag(GRPC_EXPERIMENTAL_SUPPORT_TRACING_MESSAGE_SIZES, false); + } + /** * Returns the current time, for tests that rely on the clock. */ @@ -850,16 +860,21 @@ public abstract class AbstractTransportTest { message.close(); assertThat(clientStreamTracer1.nextOutboundEvent()) .matches("outboundMessageSent\\(0, -?[0-9]+, -?[0-9]+\\)"); - assertThat(clientStreamTracer1.getOutboundWireSize()).isGreaterThan(0L); - assertThat(clientStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L); + if (isEnabledSupportTracingMessageSizes()) { + assertThat(clientStreamTracer1.getOutboundWireSize()).isGreaterThan(0L); + assertThat(clientStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L); + } + assertThat(serverStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage(0)"); assertNull("no additional message expected", serverStreamListener.messageQueue.poll()); clientStream.halfClose(); assertTrue(serverStreamListener.awaitHalfClosed(TIMEOUT_MS, TimeUnit.MILLISECONDS)); - assertThat(serverStreamTracer1.getInboundWireSize()).isGreaterThan(0L); - assertThat(serverStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L); + if (isEnabledSupportTracingMessageSizes()) { + assertThat(serverStreamTracer1.getInboundWireSize()).isGreaterThan(0L); + assertThat(serverStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L); + } assertThat(serverStreamTracer1.nextInboundEvent()) .matches("inboundMessageRead\\(0, -?[0-9]+, -?[0-9]+\\)"); @@ -890,15 +905,19 @@ public abstract class AbstractTransportTest { assertNotNull("message expected", message); assertThat(serverStreamTracer1.nextOutboundEvent()) .matches("outboundMessageSent\\(0, -?[0-9]+, -?[0-9]+\\)"); - assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L); - assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L); + if (isEnabledSupportTracingMessageSizes()) { + assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L); + assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L); + } assertTrue(clientStreamTracer1.getInboundHeaders()); assertThat(clientStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage(0)"); assertEquals("Hi. Who are you?", methodDescriptor.parseResponse(message)); assertThat(clientStreamTracer1.nextInboundEvent()) .matches("inboundMessageRead\\(0, -?[0-9]+, -?[0-9]+\\)"); - assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L); - assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L); + if (isEnabledSupportTracingMessageSizes()) { + assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L); + assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L); + } message.close(); assertNull("no additional message expected", clientStreamListener.messageQueue.poll()); @@ -1258,10 +1277,12 @@ public abstract class AbstractTransportTest { serverStream.close(Status.OK, new Metadata()); assertTrue(clientStreamTracer1.getOutboundHeaders()); assertTrue(clientStreamTracer1.getInboundHeaders()); - assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L); - assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L); - assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L); - assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L); + if (isEnabledSupportTracingMessageSizes()) { + assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L); + assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L); + assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L); + assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L); + } assertNull(clientStreamTracer1.getInboundTrailers()); assertSame(status, clientStreamTracer1.getStatus()); // There is a race between client cancelling and server closing. The final status seen by the diff --git a/inprocess/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java b/inprocess/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java index 4e711a9400..9b33b3d361 100644 --- a/inprocess/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java +++ b/inprocess/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java @@ -18,6 +18,7 @@ package io.grpc.inprocess; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.inprocess.InProcessTransport.isEnabledSupportTracingMessageSizes; import com.google.errorprone.annotations.DoNotCall; import io.grpc.ChannelCredentials; @@ -118,6 +119,9 @@ public final class InProcessChannelBuilder extends managedChannelImplBuilder.setStatsRecordStartedRpcs(false); managedChannelImplBuilder.setStatsRecordFinishedRpcs(false); managedChannelImplBuilder.setStatsRecordRetryMetrics(false); + if (!isEnabledSupportTracingMessageSizes) { + managedChannelImplBuilder.disableRetry(); + } } @Internal diff --git a/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java b/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java index 0d173b9671..4212d96e9f 100644 --- a/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -82,6 +82,8 @@ import javax.annotation.concurrent.ThreadSafe; @ThreadSafe final class InProcessTransport implements ServerTransport, ConnectionClientTransport { private static final Logger log = Logger.getLogger(InProcessTransport.class.getName()); + static boolean isEnabledSupportTracingMessageSizes = + GrpcUtil.getFlag("GRPC_EXPERIMENTAL_SUPPORT_TRACING_MESSAGE_SIZES", false); private final InternalLogId logId; private final SocketAddress address; @@ -485,22 +487,25 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans @Override public void writeMessage(InputStream message) { - long messageLength; - try { - if (assumedMessageSize != -1) { - messageLength = assumedMessageSize; - } else if (message instanceof KnownLength || message instanceof ByteArrayInputStream) { - messageLength = message.available(); - } else { - InputStream oldMessage = message; - byte[] payload = ByteStreams.toByteArray(message); - messageLength = payload.length; - message = new ByteArrayInputStream(payload); - oldMessage.close(); + long messageLength = 0; + if (isEnabledSupportTracingMessageSizes) { + try { + if (assumedMessageSize != -1) { + messageLength = assumedMessageSize; + } else if (message instanceof KnownLength || message instanceof ByteArrayInputStream) { + messageLength = message.available(); + } else { + InputStream oldMessage = message; + byte[] payload = ByteStreams.toByteArray(message); + messageLength = payload.length; + message = new ByteArrayInputStream(payload); + oldMessage.close(); + } + } catch (Exception e) { + throw new RuntimeException("Error processing the message length", e); } - } catch (Exception e) { - throw new RuntimeException("Error processing the message length", e); } + synchronized (this) { if (closed) { return; @@ -509,11 +514,13 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1); clientStream.statsTraceCtx.inboundMessage(outboundSeqNo); clientStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1); - statsTraceCtx.outboundUncompressedSize(messageLength); - statsTraceCtx.outboundWireSize(messageLength); - // messageLength should be same at receiver's end as no actual wire is involved. - clientStream.statsTraceCtx.inboundUncompressedSize(messageLength); - clientStream.statsTraceCtx.inboundWireSize(messageLength); + if (isEnabledSupportTracingMessageSizes) { + statsTraceCtx.outboundUncompressedSize(messageLength); + statsTraceCtx.outboundWireSize(messageLength); + // messageLength should be same at receiver's end as no actual wire is involved. + clientStream.statsTraceCtx.inboundUncompressedSize(messageLength); + clientStream.statsTraceCtx.inboundWireSize(messageLength); + } outboundSeqNo++; StreamListener.MessageProducer producer = new SingleMessageProducer(message); if (clientRequested > 0) { @@ -523,7 +530,6 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans clientReceiveQueue.add(producer); } } - syncContext.drain(); } @@ -777,21 +783,23 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans @Override public void writeMessage(InputStream message) { - long messageLength; - try { - if (assumedMessageSize != -1) { - messageLength = assumedMessageSize; - } else if (message instanceof KnownLength || message instanceof ByteArrayInputStream) { - messageLength = message.available(); - } else { - InputStream oldMessage = message; - byte[] payload = ByteStreams.toByteArray(message); - messageLength = payload.length; - message = new ByteArrayInputStream(payload); - oldMessage.close(); + long messageLength = 0; + if (isEnabledSupportTracingMessageSizes) { + try { + if (assumedMessageSize != -1) { + messageLength = assumedMessageSize; + } else if (message instanceof KnownLength || message instanceof ByteArrayInputStream) { + messageLength = message.available(); + } else { + InputStream oldMessage = message; + byte[] payload = ByteStreams.toByteArray(message); + messageLength = payload.length; + message = new ByteArrayInputStream(payload); + oldMessage.close(); + } + } catch (Exception e) { + throw new RuntimeException("Error processing the message length", e); } - } catch (Exception e) { - throw new RuntimeException("Error processing the message length", e); } synchronized (this) { if (closed) { @@ -801,11 +809,13 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1); serverStream.statsTraceCtx.inboundMessage(outboundSeqNo); serverStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1); - statsTraceCtx.outboundUncompressedSize(messageLength); - statsTraceCtx.outboundWireSize(messageLength); - // messageLength should be same at receiver's end as no actual wire is involved. - serverStream.statsTraceCtx.inboundUncompressedSize(messageLength); - serverStream.statsTraceCtx.inboundWireSize(messageLength); + if (isEnabledSupportTracingMessageSizes) { + statsTraceCtx.outboundUncompressedSize(messageLength); + statsTraceCtx.outboundWireSize(messageLength); + // messageLength should be same at receiver's end as no actual wire is involved. + serverStream.statsTraceCtx.inboundUncompressedSize(messageLength); + serverStream.statsTraceCtx.inboundWireSize(messageLength); + } outboundSeqNo++; StreamListener.MessageProducer producer = new SingleMessageProducer(message); if (serverRequested > 0) { diff --git a/inprocess/src/test/java/io/grpc/inprocess/InProcessTransportTest.java b/inprocess/src/test/java/io/grpc/inprocess/InProcessTransportTest.java index 87d7467c11..3ed8dd24ca 100644 --- a/inprocess/src/test/java/io/grpc/inprocess/InProcessTransportTest.java +++ b/inprocess/src/test/java/io/grpc/inprocess/InProcessTransportTest.java @@ -234,9 +234,11 @@ public class InProcessTransportTest extends AbstractTransportTest { private void assertAssumedMessageSize( TestStreamTracer streamTracerSender, TestStreamTracer streamTracerReceiver) { - Assert.assertEquals(TEST_MESSAGE_LENGTH, streamTracerSender.getOutboundWireSize()); - Assert.assertEquals(TEST_MESSAGE_LENGTH, streamTracerSender.getOutboundUncompressedSize()); - Assert.assertEquals(TEST_MESSAGE_LENGTH, streamTracerReceiver.getInboundWireSize()); - Assert.assertEquals(TEST_MESSAGE_LENGTH, streamTracerReceiver.getInboundUncompressedSize()); + if (isEnabledSupportTracingMessageSizes()) { + Assert.assertEquals(TEST_MESSAGE_LENGTH, streamTracerSender.getOutboundWireSize()); + Assert.assertEquals(TEST_MESSAGE_LENGTH, streamTracerSender.getOutboundUncompressedSize()); + Assert.assertEquals(TEST_MESSAGE_LENGTH, streamTracerReceiver.getInboundWireSize()); + Assert.assertEquals(TEST_MESSAGE_LENGTH, streamTracerReceiver.getInboundUncompressedSize()); + } } }