core: record server_elapsed_time on client (#2673)

It is defined as the time between the client sends out the headers, and the RPC finishes.
This commit is contained in:
Kun Zhang 2017-02-03 13:29:06 -08:00 committed by GitHub
parent 6fbe140959
commit 7ab5e0e810
7 changed files with 283 additions and 15 deletions

View File

@ -34,7 +34,6 @@ package io.grpc.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.Decompressor;
@ -124,6 +123,7 @@ public abstract class AbstractStream2 implements Stream {
private final MessageDeframer deframer;
private final Object onReadyLock = new Object();
private final StatsTraceContext statsTraceCtx;
/**
* The number of bytes currently queued, waiting to be sent. When this falls below
* DEFAULT_ONREADY_THRESHOLD, {@link StreamListener#onReady()} will be called.
@ -144,15 +144,11 @@ public abstract class AbstractStream2 implements Stream {
private boolean deallocated;
protected TransportState(int maxMessageSize, StatsTraceContext statsTraceCtx) {
this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
deframer = new MessageDeframer(
this, Codec.Identity.NONE, maxMessageSize, statsTraceCtx, getClass().getName());
}
@VisibleForTesting
TransportState(MessageDeframer deframer) {
this.deframer = deframer;
}
final void setMaxInboundMessageSize(int maxSize) {
deframer.setMaxInboundMessageSize(maxSize);
}
@ -220,6 +216,10 @@ public abstract class AbstractStream2 implements Stream {
}
}
public final StatsTraceContext getStatsTraceContext() {
return statsTraceCtx;
}
private void setDecompressor(Decompressor decompressor) {
if (deframer.isClosed()) {
return;

View File

@ -32,6 +32,7 @@
package io.grpc.internal;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.instrumentation.stats.MeasurementDescriptor;
@ -64,6 +65,8 @@ public final class StatsTraceContext {
"noopservice/noopmethod", NoopStatsContextFactory.INSTANCE,
GrpcUtil.STOPWATCH_SUPPLIER);
private static final double NANOS_PER_MILLI = 1000 * 1000;
private enum Side {
CLIENT, SERVER
}
@ -72,6 +75,7 @@ public final class StatsTraceContext {
private final Stopwatch stopwatch;
private final Side side;
private final Metadata.Key<StatsContext> statsHeader;
private volatile long clientPendingNanos = -1;
private volatile long wireBytesSent;
private volatile long wireBytesReceived;
private volatile long uncompressedBytesSent;
@ -204,6 +208,18 @@ public final class StatsTraceContext {
uncompressedBytesReceived += bytes;
}
/**
* Mark the time when the headers, which are the first bytes of the RPC, are sent from the client.
* This is specific to transport implementation, thus should be called from transports. Calling
* it the second time or more is a no-op.
*/
public void clientHeadersSent() {
Preconditions.checkState(side == Side.CLIENT, "Must be called on client-side");
if (clientPendingNanos < 0) {
clientPendingNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
}
}
/**
* Record a finished all and mark the current time as the end time.
*
@ -233,14 +249,21 @@ public final class StatsTraceContext {
uncompressedBytesSentMetric = RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES;
uncompressedBytesReceivedMetric = RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES;
}
statsCtx
.with(RpcConstants.RPC_STATUS, TagValue.create(status.getCode().toString()))
.record(MeasurementMap.builder()
.put(latencyMetric, stopwatch.elapsed(TimeUnit.MILLISECONDS))
.put(wireBytesSentMetric, wireBytesSent)
.put(wireBytesReceivedMetric, wireBytesReceived)
.put(uncompressedBytesSentMetric, uncompressedBytesSent)
.put(uncompressedBytesReceivedMetric, uncompressedBytesReceived)
.build());
long roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
MeasurementMap.Builder builder = MeasurementMap.builder()
.put(latencyMetric, roundtripNanos / NANOS_PER_MILLI) // in double
.put(wireBytesSentMetric, wireBytesSent)
.put(wireBytesReceivedMetric, wireBytesReceived)
.put(uncompressedBytesSentMetric, uncompressedBytesSent)
.put(uncompressedBytesReceivedMetric, uncompressedBytesReceived);
if (side == Side.CLIENT) {
if (clientPendingNanos >= 0) {
builder.put(
RpcConstants.RPC_CLIENT_SERVER_ELAPSED_TIME,
(roundtripNanos - clientPendingNanos) / NANOS_PER_MILLI); // in double
}
}
statsCtx.with(RpcConstants.RPC_STATUS, TagValue.create(status.getCode().toString()))
.record(builder.build());
}
}

View File

@ -0,0 +1,237 @@
/*
* Copyright 2017, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.internal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import com.google.instrumentation.stats.RpcConstants;
import com.google.instrumentation.stats.StatsContext;
import com.google.instrumentation.stats.TagValue;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.internal.testing.StatsTestUtils.FakeStatsContextFactory;
import io.grpc.internal.testing.StatsTestUtils;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* Test for {@link StatsTraceContext}.
*/
@RunWith(JUnit4.class)
public class StatsTraceContextTest {
private FakeClock fakeClock = new FakeClock();
private FakeStatsContextFactory statsCtxFactory = new FakeStatsContextFactory();
@After
public void allRecordsVerified() {
assertNull(statsCtxFactory.pollRecord());
}
@Test
public void clientBasic() {
String methodName = MethodDescriptor.generateFullMethodName("Service1", "method1");
StatsTraceContext ctx = StatsTraceContext.newClientContextForTesting(
methodName, statsCtxFactory, statsCtxFactory.getDefault(),
fakeClock.getStopwatchSupplier());
fakeClock.forwardMillis(30);
ctx.clientHeadersSent();
fakeClock.forwardMillis(100);
ctx.wireBytesSent(1028);
ctx.uncompressedBytesSent(1128);
fakeClock.forwardMillis(16);
ctx.wireBytesReceived(33);
ctx.uncompressedBytesReceived(67);
ctx.wireBytesSent(99);
ctx.uncompressedBytesSent(865);
fakeClock.forwardMillis(24);
ctx.wireBytesReceived(154);
ctx.uncompressedBytesReceived(552);
ctx.callEnded(Status.OK);
StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord();
assertNotNull(record);
assertNoServerContent(record);
TagValue methodTag = record.tags.get(RpcConstants.RPC_CLIENT_METHOD);
assertEquals(methodName, methodTag.toString());
TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS);
assertEquals(Status.Code.OK.toString(), statusTag.toString());
assertEquals(1028 + 99, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_BYTES));
assertEquals(1128 + 865,
record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
assertEquals(33 + 154, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_BYTES));
assertEquals(67 + 552,
record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
assertEquals(30 + 100 + 16 + 24,
record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
assertEquals(100 + 16 + 24,
record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_SERVER_ELAPSED_TIME));
}
@Test
public void clientNotSent() {
String methodName = MethodDescriptor.generateFullMethodName("Service1", "method2");
StatsTraceContext ctx = StatsTraceContext.newClientContextForTesting(
methodName, statsCtxFactory, statsCtxFactory.getDefault(),
fakeClock.getStopwatchSupplier());
fakeClock.forwardMillis(3000);
ctx.callEnded(Status.DEADLINE_EXCEEDED.withDescription("3 seconds"));
StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord();
assertNotNull(record);
assertNoServerContent(record);
TagValue methodTag = record.tags.get(RpcConstants.RPC_CLIENT_METHOD);
assertEquals(methodName, methodTag.toString());
TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS);
assertEquals(Status.Code.DEADLINE_EXCEEDED.toString(), statusTag.toString());
assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_BYTES));
assertEquals(0,
record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_BYTES));
assertEquals(0,
record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
assertEquals(3000, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
assertNull(record.getMetric(RpcConstants.RPC_CLIENT_SERVER_ELAPSED_TIME));
}
/**
* Tags that are propagated by the {@link StatsContextFactory} are properly propagated via
* the headers.
*/
@Test
public void tagPropagation() {
String methodName = MethodDescriptor.generateFullMethodName("Service1", "method3");
// EXTRA_TAG is propagated by the FakeStatsContextFactory. Note that not all tags are
// propagated. The StatsContextFactory decides which tags are to propagated. gRPC facilitates
// the propagation by putting them in the headers.
StatsContext parentCtx = statsCtxFactory.getDefault().with(
StatsTestUtils.EXTRA_TAG, TagValue.create("extra-tag-value-897"));
StatsTraceContext clientCtx = StatsTraceContext.newClientContextForTesting(
methodName, statsCtxFactory, parentCtx, fakeClock.getStopwatchSupplier());
Metadata headers = new Metadata();
clientCtx.propagateToHeaders(headers);
// The server gets the propagated tag from the headers, and puts it on the server-side
// StatsContext.
StatsTraceContext serverCtx = StatsTraceContext.newServerContext(
methodName, statsCtxFactory, headers, fakeClock.getStopwatchSupplier());
serverCtx.callEnded(Status.OK);
clientCtx.callEnded(Status.OK);
StatsTestUtils.MetricsRecord serverRecord = statsCtxFactory.pollRecord();
assertNotNull(serverRecord);
assertNoClientContent(serverRecord);
TagValue serverMethodTag = serverRecord.tags.get(RpcConstants.RPC_SERVER_METHOD);
assertEquals(methodName, serverMethodTag.toString());
TagValue serverStatusTag = serverRecord.tags.get(RpcConstants.RPC_STATUS);
assertEquals(Status.Code.OK.toString(), serverStatusTag.toString());
TagValue serverPropagatedTag = serverRecord.tags.get(StatsTestUtils.EXTRA_TAG);
assertEquals("extra-tag-value-897", serverPropagatedTag.toString());
StatsTestUtils.MetricsRecord clientRecord = statsCtxFactory.pollRecord();
assertNotNull(clientRecord);
assertNoServerContent(clientRecord);
TagValue clientMethodTag = clientRecord.tags.get(RpcConstants.RPC_CLIENT_METHOD);
assertEquals(methodName, clientMethodTag.toString());
TagValue clientStatusTag = clientRecord.tags.get(RpcConstants.RPC_STATUS);
assertEquals(Status.Code.OK.toString(), clientStatusTag.toString());
TagValue clientPropagatedTag = clientRecord.tags.get(StatsTestUtils.EXTRA_TAG);
assertEquals("extra-tag-value-897", clientPropagatedTag.toString());
}
@Test
public void serverBasic() {
String methodName = MethodDescriptor.generateFullMethodName("Service1", "method4");
StatsTraceContext ctx = StatsTraceContext.newServerContext(
methodName, statsCtxFactory, new Metadata(), fakeClock.getStopwatchSupplier());
ctx.wireBytesReceived(34);
ctx.uncompressedBytesReceived(67);
fakeClock.forwardMillis(100);
ctx.wireBytesSent(1028);
ctx.uncompressedBytesSent(1128);
fakeClock.forwardMillis(16);
ctx.wireBytesReceived(154);
ctx.uncompressedBytesReceived(552);
ctx.wireBytesSent(99);
ctx.uncompressedBytesSent(865);
fakeClock.forwardMillis(24);
ctx.callEnded(Status.CANCELLED);
StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord();
assertNotNull(record);
assertNoClientContent(record);
TagValue methodTag = record.tags.get(RpcConstants.RPC_SERVER_METHOD);
assertEquals(methodName, methodTag.toString());
TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS);
assertEquals(Status.Code.CANCELLED.toString(), statusTag.toString());
assertEquals(1028 + 99, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_RESPONSE_BYTES));
assertEquals(1128 + 865,
record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES));
assertEquals(34 + 154, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_REQUEST_BYTES));
assertEquals(67 + 552,
record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES));
assertEquals(100 + 16 + 24,
record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_SERVER_LATENCY));
}
private static void assertNoServerContent(StatsTestUtils.MetricsRecord record) {
assertNull(record.getMetric(RpcConstants.RPC_SERVER_ERROR_COUNT));
assertNull(record.getMetric(RpcConstants.RPC_SERVER_REQUEST_BYTES));
assertNull(record.getMetric(RpcConstants.RPC_SERVER_RESPONSE_BYTES));
assertNull(record.getMetric(RpcConstants.RPC_SERVER_SERVER_ELAPSED_TIME));
assertNull(record.getMetric(RpcConstants.RPC_SERVER_SERVER_LATENCY));
assertNull(record.getMetric(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES));
assertNull(record.getMetric(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES));
}
private static void assertNoClientContent(StatsTestUtils.MetricsRecord record) {
assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ERROR_COUNT));
assertNull(record.getMetric(RpcConstants.RPC_CLIENT_REQUEST_BYTES));
assertNull(record.getMetric(RpcConstants.RPC_CLIENT_RESPONSE_BYTES));
assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
assertNull(record.getMetric(RpcConstants.RPC_CLIENT_SERVER_ELAPSED_TIME));
assertNull(record.getMetric(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
assertNull(record.getMetric(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
}
}

View File

@ -1431,6 +1431,7 @@ public abstract class AbstractInteropTest {
record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
assertEquals(uncompressedResponsesSize,
record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
assertNotNull(record.getMetric(RpcConstants.RPC_CLIENT_SERVER_ELAPSED_TIME));
// It's impossible to get the expected wire sizes because it may be compressed, so we just
// check if they are recorded.
assertNotNull(record.getMetric(RpcConstants.RPC_CLIENT_REQUEST_BYTES));

View File

@ -419,6 +419,7 @@ class NettyClientHandler extends AbstractNettyHandler {
// was canceled via RST_STREAM.
Http2Stream http2Stream = connection().stream(streamId);
if (http2Stream != null) {
stream.getStatsTraceContext().clientHeadersSent();
http2Stream.setProperty(streamKey, stream);
// Attach the client stream to the HTTP/2 stream object as user data.

View File

@ -72,6 +72,7 @@ class OkHttpClientStream extends Http2ClientStream {
private final OkHttpClientTransport transport;
private final Object lock;
private final String userAgent;
private final StatsTraceContext statsTraceCtx;
private String authority;
private Object outboundFlowState;
private volatile int id = ABSENT_ID;
@ -98,6 +99,7 @@ class OkHttpClientStream extends Http2ClientStream {
String userAgent,
StatsTraceContext statsTraceCtx) {
super(new OkHttpWritableBufferAllocator(), maxMessageSize, statsTraceCtx);
this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
this.method = method;
this.headers = headers;
this.frameWriter = frameWriter;
@ -160,6 +162,7 @@ class OkHttpClientStream extends Http2ClientStream {
if (pendingData != null) {
// Only happens when the stream has neither been started nor cancelled.
frameWriter.synStream(false, false, id, 0, requestHeaders);
statsTraceCtx.clientHeadersSent();
requestHeaders = null;
boolean flush = false;

View File

@ -91,6 +91,9 @@ public class StatsTestUtils {
}
}
/**
* This tag will be propagated by {@link FakeStatsContextFactory} on the wire.
*/
public static final TagKey EXTRA_TAG = TagKey.create("/rpc/test/extratag");
private static final String EXTRA_TAG_HEADER_VALUE_PREFIX = "extratag:";