diff --git a/core/src/main/java/io/grpc/internal/AbstractStream2.java b/core/src/main/java/io/grpc/internal/AbstractStream2.java index ba9821cc9e..28d8a89034 100644 --- a/core/src/main/java/io/grpc/internal/AbstractStream2.java +++ b/core/src/main/java/io/grpc/internal/AbstractStream2.java @@ -34,7 +34,6 @@ package io.grpc.internal; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import com.google.common.annotations.VisibleForTesting; import io.grpc.Codec; import io.grpc.Compressor; import io.grpc.Decompressor; @@ -124,6 +123,7 @@ public abstract class AbstractStream2 implements Stream { private final MessageDeframer deframer; private final Object onReadyLock = new Object(); + private final StatsTraceContext statsTraceCtx; /** * The number of bytes currently queued, waiting to be sent. When this falls below * DEFAULT_ONREADY_THRESHOLD, {@link StreamListener#onReady()} will be called. @@ -144,15 +144,11 @@ public abstract class AbstractStream2 implements Stream { private boolean deallocated; protected TransportState(int maxMessageSize, StatsTraceContext statsTraceCtx) { + this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx"); deframer = new MessageDeframer( this, Codec.Identity.NONE, maxMessageSize, statsTraceCtx, getClass().getName()); } - @VisibleForTesting - TransportState(MessageDeframer deframer) { - this.deframer = deframer; - } - final void setMaxInboundMessageSize(int maxSize) { deframer.setMaxInboundMessageSize(maxSize); } @@ -220,6 +216,10 @@ public abstract class AbstractStream2 implements Stream { } } + public final StatsTraceContext getStatsTraceContext() { + return statsTraceCtx; + } + private void setDecompressor(Decompressor decompressor) { if (deframer.isClosed()) { return; diff --git a/core/src/main/java/io/grpc/internal/StatsTraceContext.java b/core/src/main/java/io/grpc/internal/StatsTraceContext.java index f15db8b2cd..5ce21678a3 100644 --- a/core/src/main/java/io/grpc/internal/StatsTraceContext.java +++ b/core/src/main/java/io/grpc/internal/StatsTraceContext.java @@ -32,6 +32,7 @@ package io.grpc.internal; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import com.google.instrumentation.stats.MeasurementDescriptor; @@ -64,6 +65,8 @@ public final class StatsTraceContext { "noopservice/noopmethod", NoopStatsContextFactory.INSTANCE, GrpcUtil.STOPWATCH_SUPPLIER); + private static final double NANOS_PER_MILLI = 1000 * 1000; + private enum Side { CLIENT, SERVER } @@ -72,6 +75,7 @@ public final class StatsTraceContext { private final Stopwatch stopwatch; private final Side side; private final Metadata.Key statsHeader; + private volatile long clientPendingNanos = -1; private volatile long wireBytesSent; private volatile long wireBytesReceived; private volatile long uncompressedBytesSent; @@ -204,6 +208,18 @@ public final class StatsTraceContext { uncompressedBytesReceived += bytes; } + /** + * Mark the time when the headers, which are the first bytes of the RPC, are sent from the client. + * This is specific to transport implementation, thus should be called from transports. Calling + * it the second time or more is a no-op. + */ + public void clientHeadersSent() { + Preconditions.checkState(side == Side.CLIENT, "Must be called on client-side"); + if (clientPendingNanos < 0) { + clientPendingNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS); + } + } + /** * Record a finished all and mark the current time as the end time. * @@ -233,14 +249,21 @@ public final class StatsTraceContext { uncompressedBytesSentMetric = RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES; uncompressedBytesReceivedMetric = RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES; } - statsCtx - .with(RpcConstants.RPC_STATUS, TagValue.create(status.getCode().toString())) - .record(MeasurementMap.builder() - .put(latencyMetric, stopwatch.elapsed(TimeUnit.MILLISECONDS)) - .put(wireBytesSentMetric, wireBytesSent) - .put(wireBytesReceivedMetric, wireBytesReceived) - .put(uncompressedBytesSentMetric, uncompressedBytesSent) - .put(uncompressedBytesReceivedMetric, uncompressedBytesReceived) - .build()); + long roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS); + MeasurementMap.Builder builder = MeasurementMap.builder() + .put(latencyMetric, roundtripNanos / NANOS_PER_MILLI) // in double + .put(wireBytesSentMetric, wireBytesSent) + .put(wireBytesReceivedMetric, wireBytesReceived) + .put(uncompressedBytesSentMetric, uncompressedBytesSent) + .put(uncompressedBytesReceivedMetric, uncompressedBytesReceived); + if (side == Side.CLIENT) { + if (clientPendingNanos >= 0) { + builder.put( + RpcConstants.RPC_CLIENT_SERVER_ELAPSED_TIME, + (roundtripNanos - clientPendingNanos) / NANOS_PER_MILLI); // in double + } + } + statsCtx.with(RpcConstants.RPC_STATUS, TagValue.create(status.getCode().toString())) + .record(builder.build()); } } diff --git a/core/src/test/java/io/grpc/internal/StatsTraceContextTest.java b/core/src/test/java/io/grpc/internal/StatsTraceContextTest.java new file mode 100644 index 0000000000..473bf7d9b1 --- /dev/null +++ b/core/src/test/java/io/grpc/internal/StatsTraceContextTest.java @@ -0,0 +1,237 @@ +/* + * Copyright 2017, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.internal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import com.google.instrumentation.stats.RpcConstants; +import com.google.instrumentation.stats.StatsContext; +import com.google.instrumentation.stats.TagValue; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import io.grpc.internal.testing.StatsTestUtils.FakeStatsContextFactory; +import io.grpc.internal.testing.StatsTestUtils; +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Test for {@link StatsTraceContext}. + */ +@RunWith(JUnit4.class) +public class StatsTraceContextTest { + private FakeClock fakeClock = new FakeClock(); + private FakeStatsContextFactory statsCtxFactory = new FakeStatsContextFactory(); + + @After + public void allRecordsVerified() { + assertNull(statsCtxFactory.pollRecord()); + } + + @Test + public void clientBasic() { + String methodName = MethodDescriptor.generateFullMethodName("Service1", "method1"); + StatsTraceContext ctx = StatsTraceContext.newClientContextForTesting( + methodName, statsCtxFactory, statsCtxFactory.getDefault(), + fakeClock.getStopwatchSupplier()); + fakeClock.forwardMillis(30); + ctx.clientHeadersSent(); + + fakeClock.forwardMillis(100); + ctx.wireBytesSent(1028); + ctx.uncompressedBytesSent(1128); + + fakeClock.forwardMillis(16); + ctx.wireBytesReceived(33); + ctx.uncompressedBytesReceived(67); + ctx.wireBytesSent(99); + ctx.uncompressedBytesSent(865); + + fakeClock.forwardMillis(24); + ctx.wireBytesReceived(154); + ctx.uncompressedBytesReceived(552); + ctx.callEnded(Status.OK); + + StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord(); + assertNotNull(record); + assertNoServerContent(record); + TagValue methodTag = record.tags.get(RpcConstants.RPC_CLIENT_METHOD); + assertEquals(methodName, methodTag.toString()); + TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS); + assertEquals(Status.Code.OK.toString(), statusTag.toString()); + assertEquals(1028 + 99, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_BYTES)); + assertEquals(1128 + 865, + record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); + assertEquals(33 + 154, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_BYTES)); + assertEquals(67 + 552, + record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); + assertEquals(30 + 100 + 16 + 24, + record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); + assertEquals(100 + 16 + 24, + record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_SERVER_ELAPSED_TIME)); + } + + @Test + public void clientNotSent() { + String methodName = MethodDescriptor.generateFullMethodName("Service1", "method2"); + StatsTraceContext ctx = StatsTraceContext.newClientContextForTesting( + methodName, statsCtxFactory, statsCtxFactory.getDefault(), + fakeClock.getStopwatchSupplier()); + fakeClock.forwardMillis(3000); + ctx.callEnded(Status.DEADLINE_EXCEEDED.withDescription("3 seconds")); + + StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord(); + assertNotNull(record); + assertNoServerContent(record); + TagValue methodTag = record.tags.get(RpcConstants.RPC_CLIENT_METHOD); + assertEquals(methodName, methodTag.toString()); + TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS); + assertEquals(Status.Code.DEADLINE_EXCEEDED.toString(), statusTag.toString()); + assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_BYTES)); + assertEquals(0, + record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); + assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_BYTES)); + assertEquals(0, + record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); + assertEquals(3000, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); + assertNull(record.getMetric(RpcConstants.RPC_CLIENT_SERVER_ELAPSED_TIME)); + } + + /** + * Tags that are propagated by the {@link StatsContextFactory} are properly propagated via + * the headers. + */ + @Test + public void tagPropagation() { + String methodName = MethodDescriptor.generateFullMethodName("Service1", "method3"); + + // EXTRA_TAG is propagated by the FakeStatsContextFactory. Note that not all tags are + // propagated. The StatsContextFactory decides which tags are to propagated. gRPC facilitates + // the propagation by putting them in the headers. + StatsContext parentCtx = statsCtxFactory.getDefault().with( + StatsTestUtils.EXTRA_TAG, TagValue.create("extra-tag-value-897")); + StatsTraceContext clientCtx = StatsTraceContext.newClientContextForTesting( + methodName, statsCtxFactory, parentCtx, fakeClock.getStopwatchSupplier()); + Metadata headers = new Metadata(); + clientCtx.propagateToHeaders(headers); + + // The server gets the propagated tag from the headers, and puts it on the server-side + // StatsContext. + StatsTraceContext serverCtx = StatsTraceContext.newServerContext( + methodName, statsCtxFactory, headers, fakeClock.getStopwatchSupplier()); + + serverCtx.callEnded(Status.OK); + clientCtx.callEnded(Status.OK); + + StatsTestUtils.MetricsRecord serverRecord = statsCtxFactory.pollRecord(); + assertNotNull(serverRecord); + assertNoClientContent(serverRecord); + TagValue serverMethodTag = serverRecord.tags.get(RpcConstants.RPC_SERVER_METHOD); + assertEquals(methodName, serverMethodTag.toString()); + TagValue serverStatusTag = serverRecord.tags.get(RpcConstants.RPC_STATUS); + assertEquals(Status.Code.OK.toString(), serverStatusTag.toString()); + TagValue serverPropagatedTag = serverRecord.tags.get(StatsTestUtils.EXTRA_TAG); + assertEquals("extra-tag-value-897", serverPropagatedTag.toString()); + + StatsTestUtils.MetricsRecord clientRecord = statsCtxFactory.pollRecord(); + assertNotNull(clientRecord); + assertNoServerContent(clientRecord); + TagValue clientMethodTag = clientRecord.tags.get(RpcConstants.RPC_CLIENT_METHOD); + assertEquals(methodName, clientMethodTag.toString()); + TagValue clientStatusTag = clientRecord.tags.get(RpcConstants.RPC_STATUS); + assertEquals(Status.Code.OK.toString(), clientStatusTag.toString()); + TagValue clientPropagatedTag = clientRecord.tags.get(StatsTestUtils.EXTRA_TAG); + assertEquals("extra-tag-value-897", clientPropagatedTag.toString()); + } + + @Test + public void serverBasic() { + String methodName = MethodDescriptor.generateFullMethodName("Service1", "method4"); + StatsTraceContext ctx = StatsTraceContext.newServerContext( + methodName, statsCtxFactory, new Metadata(), fakeClock.getStopwatchSupplier()); + ctx.wireBytesReceived(34); + ctx.uncompressedBytesReceived(67); + + fakeClock.forwardMillis(100); + ctx.wireBytesSent(1028); + ctx.uncompressedBytesSent(1128); + + fakeClock.forwardMillis(16); + ctx.wireBytesReceived(154); + ctx.uncompressedBytesReceived(552); + ctx.wireBytesSent(99); + ctx.uncompressedBytesSent(865); + + fakeClock.forwardMillis(24); + ctx.callEnded(Status.CANCELLED); + + StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord(); + assertNotNull(record); + assertNoClientContent(record); + TagValue methodTag = record.tags.get(RpcConstants.RPC_SERVER_METHOD); + assertEquals(methodName, methodTag.toString()); + TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS); + assertEquals(Status.Code.CANCELLED.toString(), statusTag.toString()); + assertEquals(1028 + 99, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_RESPONSE_BYTES)); + assertEquals(1128 + 865, + record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES)); + assertEquals(34 + 154, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_REQUEST_BYTES)); + assertEquals(67 + 552, + record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES)); + assertEquals(100 + 16 + 24, + record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_SERVER_LATENCY)); + } + + private static void assertNoServerContent(StatsTestUtils.MetricsRecord record) { + assertNull(record.getMetric(RpcConstants.RPC_SERVER_ERROR_COUNT)); + assertNull(record.getMetric(RpcConstants.RPC_SERVER_REQUEST_BYTES)); + assertNull(record.getMetric(RpcConstants.RPC_SERVER_RESPONSE_BYTES)); + assertNull(record.getMetric(RpcConstants.RPC_SERVER_SERVER_ELAPSED_TIME)); + assertNull(record.getMetric(RpcConstants.RPC_SERVER_SERVER_LATENCY)); + assertNull(record.getMetric(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES)); + assertNull(record.getMetric(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES)); + } + + private static void assertNoClientContent(StatsTestUtils.MetricsRecord record) { + assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ERROR_COUNT)); + assertNull(record.getMetric(RpcConstants.RPC_CLIENT_REQUEST_BYTES)); + assertNull(record.getMetric(RpcConstants.RPC_CLIENT_RESPONSE_BYTES)); + assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); + assertNull(record.getMetric(RpcConstants.RPC_CLIENT_SERVER_ELAPSED_TIME)); + assertNull(record.getMetric(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); + assertNull(record.getMetric(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); + } +} diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index 2431adc358..2b8c51b305 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -1431,6 +1431,7 @@ public abstract class AbstractInteropTest { record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); assertEquals(uncompressedResponsesSize, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); + assertNotNull(record.getMetric(RpcConstants.RPC_CLIENT_SERVER_ELAPSED_TIME)); // It's impossible to get the expected wire sizes because it may be compressed, so we just // check if they are recorded. assertNotNull(record.getMetric(RpcConstants.RPC_CLIENT_REQUEST_BYTES)); diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index c2e5f8a1a9..c9b12a767a 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -419,6 +419,7 @@ class NettyClientHandler extends AbstractNettyHandler { // was canceled via RST_STREAM. Http2Stream http2Stream = connection().stream(streamId); if (http2Stream != null) { + stream.getStatsTraceContext().clientHeadersSent(); http2Stream.setProperty(streamKey, stream); // Attach the client stream to the HTTP/2 stream object as user data. diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java index 3aad048ca1..465db93a45 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java @@ -72,6 +72,7 @@ class OkHttpClientStream extends Http2ClientStream { private final OkHttpClientTransport transport; private final Object lock; private final String userAgent; + private final StatsTraceContext statsTraceCtx; private String authority; private Object outboundFlowState; private volatile int id = ABSENT_ID; @@ -98,6 +99,7 @@ class OkHttpClientStream extends Http2ClientStream { String userAgent, StatsTraceContext statsTraceCtx) { super(new OkHttpWritableBufferAllocator(), maxMessageSize, statsTraceCtx); + this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx"); this.method = method; this.headers = headers; this.frameWriter = frameWriter; @@ -160,6 +162,7 @@ class OkHttpClientStream extends Http2ClientStream { if (pendingData != null) { // Only happens when the stream has neither been started nor cancelled. frameWriter.synStream(false, false, id, 0, requestHeaders); + statsTraceCtx.clientHeadersSent(); requestHeaders = null; boolean flush = false; diff --git a/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java b/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java index 2d1f5985af..d99b81fbc9 100644 --- a/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java +++ b/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java @@ -91,6 +91,9 @@ public class StatsTestUtils { } } + /** + * This tag will be propagated by {@link FakeStatsContextFactory} on the wire. + */ public static final TagKey EXTRA_TAG = TagKey.create("/rpc/test/extratag"); private static final String EXTRA_TAG_HEADER_VALUE_PREFIX = "extratag:";