census: add per call latency metric (#9906)

* updated call latency measure with AGGREGATION_WITH_MILLIS_HISTOGRAM; added test for call latency view
This commit is contained in:
DNVindhya 2023-03-03 09:36:14 -08:00 committed by GitHub
parent 5be17e8b22
commit 66f95b7ade
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 123 additions and 8 deletions

View File

@ -17,6 +17,7 @@
package io.grpc.census; package io.grpc.census;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.census.internal.ObservabilityCensusConstants.API_LATENCY_PER_CALL;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
@ -415,6 +416,7 @@ final class CensusStatsModule {
ClientTracer inboundMetricTracer; ClientTracer inboundMetricTracer;
private final CensusStatsModule module; private final CensusStatsModule module;
private final Stopwatch stopwatch; private final Stopwatch stopwatch;
private final Stopwatch callStopwatch;
@GuardedBy("lock") @GuardedBy("lock")
private boolean callEnded; private boolean callEnded;
private final TagContext parentCtx; private final TagContext parentCtx;
@ -429,6 +431,7 @@ final class CensusStatsModule {
private final Object lock = new Object(); private final Object lock = new Object();
// write @GuardedBy("lock") and happens before read // write @GuardedBy("lock") and happens before read
private long retryDelayNanos; private long retryDelayNanos;
private long callLatencyNanos;
@GuardedBy("lock") @GuardedBy("lock")
private int activeStreams; private int activeStreams;
@GuardedBy("lock") @GuardedBy("lock")
@ -440,6 +443,7 @@ final class CensusStatsModule {
this.parentCtx = checkNotNull(parentCtx, "parentCtx"); this.parentCtx = checkNotNull(parentCtx, "parentCtx");
this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName"); this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
this.stopwatch = module.stopwatchSupplier.get(); this.stopwatch = module.stopwatchSupplier.get();
this.callStopwatch = module.stopwatchSupplier.get().start();
TagValue methodTag = TagValue.create(fullMethodName); TagValue methodTag = TagValue.create(fullMethodName);
startCtx = module.tagger.toBuilder(parentCtx) startCtx = module.tagger.toBuilder(parentCtx)
.putLocal(RpcMeasureConstants.GRPC_CLIENT_METHOD, methodTag) .putLocal(RpcMeasureConstants.GRPC_CLIENT_METHOD, methodTag)
@ -501,6 +505,7 @@ final class CensusStatsModule {
if (!module.recordFinishedRpcs) { if (!module.recordFinishedRpcs) {
return; return;
} }
callStopwatch.stop();
this.status = status; this.status = status;
boolean shouldRecordFinishedCall = false; boolean shouldRecordFinishedCall = false;
synchronized (lock) { synchronized (lock) {
@ -538,10 +543,12 @@ final class CensusStatsModule {
if (attempts > 0) { if (attempts > 0) {
retriesPerCall = attempts - 1; retriesPerCall = attempts - 1;
} }
callLatencyNanos = callStopwatch.elapsed(TimeUnit.NANOSECONDS);
MeasureMap measureMap = module.statsRecorder.newMeasureMap() MeasureMap measureMap = module.statsRecorder.newMeasureMap()
.put(RETRIES_PER_CALL, retriesPerCall) .put(RETRIES_PER_CALL, retriesPerCall)
.put(TRANSPARENT_RETRIES_PER_CALL, transparentRetriesPerCall.get()) .put(TRANSPARENT_RETRIES_PER_CALL, transparentRetriesPerCall.get())
.put(RETRY_DELAY_PER_CALL, retryDelayNanos / NANOS_PER_MILLI); .put(RETRY_DELAY_PER_CALL, retryDelayNanos / NANOS_PER_MILLI)
.put(API_LATENCY_PER_CALL, callLatencyNanos / NANOS_PER_MILLI);
TagValue methodTag = TagValue.create(fullMethodName); TagValue methodTag = TagValue.create(fullMethodName);
TagValue statusTag = TagValue.create(status.getCode().toString()); TagValue statusTag = TagValue.create(status.getCode().toString());
measureMap.record( measureMap.record(

View File

@ -25,18 +25,43 @@ import static io.opencensus.contrib.grpc.metrics.RpcMeasureConstants.GRPC_SERVER
import static io.opencensus.contrib.grpc.metrics.RpcMeasureConstants.GRPC_SERVER_SENT_BYTES_PER_RPC; import static io.opencensus.contrib.grpc.metrics.RpcMeasureConstants.GRPC_SERVER_SENT_BYTES_PER_RPC;
import static io.opencensus.contrib.grpc.metrics.RpcMeasureConstants.GRPC_SERVER_STATUS; import static io.opencensus.contrib.grpc.metrics.RpcMeasureConstants.GRPC_SERVER_STATUS;
import com.google.common.annotations.VisibleForTesting;
import io.opencensus.contrib.grpc.metrics.RpcViewConstants; import io.opencensus.contrib.grpc.metrics.RpcViewConstants;
import io.opencensus.stats.Aggregation; import io.opencensus.stats.Aggregation;
import io.opencensus.stats.Measure;
import io.opencensus.stats.Measure.MeasureDouble;
import io.opencensus.stats.View; import io.opencensus.stats.View;
import java.util.Arrays; import java.util.Arrays;
/** Temporary holder class for the observability specific OpenCensus constants. // TODO(dnvindhya): Remove metric and view definitions from this class once it is moved to
* The class will be removed once the new views are added in OpenCensus library. */ // OpenCensus library.
/**
* Temporary holder class for the observability specific OpenCensus constants. The class will be
* removed once the new views are added in OpenCensus library.
*/
@VisibleForTesting
public final class ObservabilityCensusConstants { public final class ObservabilityCensusConstants {
static final Aggregation AGGREGATION_WITH_BYTES_HISTOGRAM = static final Aggregation AGGREGATION_WITH_BYTES_HISTOGRAM =
RpcViewConstants.GRPC_CLIENT_SENT_BYTES_PER_RPC_VIEW.getAggregation(); RpcViewConstants.GRPC_CLIENT_SENT_BYTES_PER_RPC_VIEW.getAggregation();
static final Aggregation AGGREGATION_WITH_MILLIS_HISTOGRAM =
RpcViewConstants.GRPC_CLIENT_ROUNDTRIP_LATENCY_VIEW.getAggregation();
public static final MeasureDouble API_LATENCY_PER_CALL =
Measure.MeasureDouble.create(
"grpc.io/client/api_latency",
"Time taken by gRPC to complete an RPC from application's perspective",
"ms");
public static final View GRPC_CLIENT_API_LATENCY_VIEW =
View.create(
View.Name.create("grpc.io/client/api_latency"),
"Time taken by gRPC to complete an RPC from application's perspective",
API_LATENCY_PER_CALL,
AGGREGATION_WITH_MILLIS_HISTOGRAM,
Arrays.asList(GRPC_CLIENT_METHOD, GRPC_CLIENT_STATUS));
public static final View GRPC_CLIENT_SENT_COMPRESSED_MESSAGE_BYTES_PER_RPC_VIEW = public static final View GRPC_CLIENT_SENT_COMPRESSED_MESSAGE_BYTES_PER_RPC_VIEW =
View.create( View.create(
View.Name.create("grpc.io/client/sent_compressed_message_bytes_per_rpc"), View.Name.create("grpc.io/client/sent_compressed_message_bytes_per_rpc"),

View File

@ -21,6 +21,7 @@ import static com.google.common.truth.Truth.assertWithMessage;
import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.RETRIES_PER_CALL; import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.RETRIES_PER_CALL;
import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.RETRY_DELAY_PER_CALL; import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.RETRY_DELAY_PER_CALL;
import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.TRANSPARENT_RETRIES_PER_CALL; import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.TRANSPARENT_RETRIES_PER_CALL;
import static io.grpc.census.internal.ObservabilityCensusConstants.API_LATENCY_PER_CALL;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
@ -63,6 +64,7 @@ import io.grpc.ServerStreamTracer.ServerCallInfo;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.census.CensusTracingModule.CallAttemptsTracerFactory; import io.grpc.census.CensusTracingModule.CallAttemptsTracerFactory;
import io.grpc.census.internal.DeprecatedCensusConstants; import io.grpc.census.internal.DeprecatedCensusConstants;
import io.grpc.census.internal.ObservabilityCensusConstants;
import io.grpc.internal.FakeClock; import io.grpc.internal.FakeClock;
import io.grpc.internal.testing.StatsTestUtils; import io.grpc.internal.testing.StatsTestUtils;
import io.grpc.internal.testing.StatsTestUtils.FakeStatsRecorder; import io.grpc.internal.testing.StatsTestUtils.FakeStatsRecorder;
@ -121,6 +123,8 @@ import org.mockito.junit.MockitoRule;
*/ */
@RunWith(JUnit4.class) @RunWith(JUnit4.class)
public class CensusModulesTest { public class CensusModulesTest {
private static final double TOLERANCE = 1e-6;
private static final CallOptions.Key<String> CUSTOM_OPTION = private static final CallOptions.Key<String> CUSTOM_OPTION =
CallOptions.Key.createWithDefault("option1", "default"); CallOptions.Key.createWithDefault("option1", "default");
private static final CallOptions CALL_OPTIONS = private static final CallOptions CALL_OPTIONS =
@ -368,7 +372,7 @@ public class CensusModulesTest {
.setSampleToLocalSpanStore(false) .setSampleToLocalSpanStore(false)
.build()); .build());
verify(spyClientSpan, never()).end(); verify(spyClientSpan, never()).end();
assertZeroRetryRecorded(); assertPerCallMetrics(0D);
} }
@Test @Test
@ -503,7 +507,7 @@ public class CensusModulesTest {
DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
assertEquals(30 + 100 + 16 + 24, assertEquals(30 + 100 + 16 + 24,
record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_CLIENT_ROUNDTRIP_LATENCY)); record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_CLIENT_ROUNDTRIP_LATENCY));
assertZeroRetryRecorded(); assertPerCallMetrics(30D + 100 + 16 + 24);
} else { } else {
assertNull(statsRecorder.pollRecord()); assertNull(statsRecorder.pollRecord());
} }
@ -691,6 +695,8 @@ public class CensusModulesTest {
assertThat(record.getMetric(RETRIES_PER_CALL)).isEqualTo(1); assertThat(record.getMetric(RETRIES_PER_CALL)).isEqualTo(1);
assertThat(record.getMetric(TRANSPARENT_RETRIES_PER_CALL)).isEqualTo(2); assertThat(record.getMetric(TRANSPARENT_RETRIES_PER_CALL)).isEqualTo(2);
assertThat(record.getMetric(RETRY_DELAY_PER_CALL)).isEqualTo(1000D + 10 + 10); assertThat(record.getMetric(RETRY_DELAY_PER_CALL)).isEqualTo(1000D + 10 + 10);
assertThat(record.getMetric(API_LATENCY_PER_CALL))
.isEqualTo(30D + 100 + 24 + 1000 + 100 + 10 + 10 + 16 + 24);
} }
private void assertRealTimeMetric( private void assertRealTimeMetric(
@ -716,13 +722,14 @@ public class CensusModulesTest {
assertEquals(expectedValue, record.getMetricAsLongOrFail(measure)); assertEquals(expectedValue, record.getMetricAsLongOrFail(measure));
} }
private void assertZeroRetryRecorded() { private void assertPerCallMetrics(double expectedLatencyValue) {
StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord();
TagValue methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD); TagValue methodTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_METHOD);
assertEquals(method.getFullMethodName(), methodTag.asString()); assertEquals(method.getFullMethodName(), methodTag.asString());
assertThat(record.getMetric(RETRIES_PER_CALL)).isEqualTo(0); assertThat(record.getMetric(RETRIES_PER_CALL)).isEqualTo(0);
assertThat(record.getMetric(TRANSPARENT_RETRIES_PER_CALL)).isEqualTo(0); assertThat(record.getMetric(TRANSPARENT_RETRIES_PER_CALL)).isEqualTo(0);
assertThat(record.getMetric(RETRY_DELAY_PER_CALL)).isEqualTo(0D); assertThat(record.getMetric(RETRY_DELAY_PER_CALL)).isEqualTo(0D);
assertThat(record.getMetric(API_LATENCY_PER_CALL)).isEqualTo(expectedLatencyValue);
} }
@Test @Test
@ -849,7 +856,7 @@ public class CensusModulesTest {
3000, 3000,
record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_CLIENT_ROUNDTRIP_LATENCY)); record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_CLIENT_ROUNDTRIP_LATENCY));
assertNull(record.getMetric(RpcMeasureConstants.GRPC_CLIENT_SERVER_LATENCY)); assertNull(record.getMetric(RpcMeasureConstants.GRPC_CLIENT_SERVER_LATENCY));
assertZeroRetryRecorded(); assertPerCallMetrics(3000D);
} }
@Test @Test
@ -989,7 +996,7 @@ public class CensusModulesTest {
assertNull(clientRecord.getMetric(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT)); assertNull(clientRecord.getMetric(DeprecatedCensusConstants.RPC_CLIENT_ERROR_COUNT));
TagValue clientPropagatedTag = clientRecord.tags.get(StatsTestUtils.EXTRA_TAG); TagValue clientPropagatedTag = clientRecord.tags.get(StatsTestUtils.EXTRA_TAG);
assertEquals("extra-tag-value-897", clientPropagatedTag.asString()); assertEquals("extra-tag-value-897", clientPropagatedTag.asString());
assertZeroRetryRecorded(); assertPerCallMetrics(0D);
} }
if (!recordStats) { if (!recordStats) {
@ -1507,6 +1514,81 @@ public class CensusModulesTest {
}); });
} }
@Test
public void callLatencyView() throws InterruptedException {
StatsComponent localStats = new StatsComponentImpl();
localStats
.getViewManager()
.registerView(ObservabilityCensusConstants.GRPC_CLIENT_API_LATENCY_VIEW);
CensusStatsModule localCensusStats = new CensusStatsModule(
tagger, tagCtxSerializer, localStats.getStatsRecorder(), fakeClock.getStopwatchSupplier(),
false, false, true, false /* real-time */, true);
CensusStatsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
new CensusStatsModule.CallAttemptsTracerFactory(
localCensusStats, tagger.empty(), method.getFullMethodName());
Metadata headers = new Metadata();
ClientStreamTracer tracer =
callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, headers);
tracer.streamCreated(Attributes.EMPTY, headers);
fakeClock.forwardTime(50, MILLISECONDS);
Status status = Status.OK.withDescription("Success");
tracer.streamClosed(status);
callAttemptsTracerFactory.callEnded(status);
// Give OpenCensus a chance to update the views asynchronously.
Thread.sleep(100);
assertDistributionData(
localStats,
ObservabilityCensusConstants.GRPC_CLIENT_API_LATENCY_VIEW,
ImmutableList.of(TagValue.create(method.getFullMethodName()), TagValue.create("OK")),
50.0, 1, 0.0,
ImmutableList.of(
0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 1L,
0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L));
}
private void assertDistributionData(StatsComponent localStats, View view,
List<TagValue> dimension, double mean, long count, double sumOfSquaredDeviations,
List<Long> expectedBucketCounts) {
AggregationData aggregationData = localStats.getViewManager()
.getView(view.getName())
.getAggregationMap()
.get(dimension);
aggregationData.match(
Functions.</*@Nullable*/ Void>throwAssertionError(),
Functions.</*@Nullable*/ Void>throwAssertionError(),
Functions.</*@Nullable*/ Void>throwAssertionError(),
/* p3= */ new Function<AggregationData.DistributionData, Void>() {
@Override
public Void apply(AggregationData.DistributionData arg) {
assertThat(arg.getMean()).isWithin(TOLERANCE).of(mean);
assertThat(arg.getCount()).isEqualTo(count);
assertThat(arg.getSumOfSquaredDeviations())
.isWithin(TOLERANCE)
.of(sumOfSquaredDeviations);
assertThat(arg.getBucketCounts())
.containsExactlyElementsIn(expectedBucketCounts)
.inOrder();
return null;
}
},
Functions.</*@Nullable*/ Void>throwAssertionError(),
Functions.</*@Nullable*/ Void>throwAssertionError(),
new Function<AggregationData, Void>() {
@Override
public Void apply(AggregationData arg) {
assertThat(((AggregationData.DistributionData) arg).getCount()).isEqualTo(count);
return null;
}
});
}
static class CallInfo<ReqT, RespT> extends ServerCallInfo<ReqT, RespT> { static class CallInfo<ReqT, RespT> extends ServerCallInfo<ReqT, RespT> {
private final MethodDescriptor<ReqT, RespT> methodDescriptor; private final MethodDescriptor<ReqT, RespT> methodDescriptor;
private final Attributes attributes; private final Attributes attributes;

View File

@ -154,6 +154,7 @@ public final class GcpObservability implements AutoCloseable {
viewManager.registerView(RpcViewConstants.GRPC_CLIENT_COMPLETED_RPC_VIEW); viewManager.registerView(RpcViewConstants.GRPC_CLIENT_COMPLETED_RPC_VIEW);
viewManager.registerView(RpcViewConstants.GRPC_CLIENT_STARTED_RPC_VIEW); viewManager.registerView(RpcViewConstants.GRPC_CLIENT_STARTED_RPC_VIEW);
viewManager.registerView(RpcViewConstants.GRPC_CLIENT_ROUNDTRIP_LATENCY_VIEW); viewManager.registerView(RpcViewConstants.GRPC_CLIENT_ROUNDTRIP_LATENCY_VIEW);
viewManager.registerView(ObservabilityCensusConstants.GRPC_CLIENT_API_LATENCY_VIEW);
viewManager.registerView( viewManager.registerView(
ObservabilityCensusConstants.GRPC_CLIENT_SENT_COMPRESSED_MESSAGE_BYTES_PER_RPC_VIEW); ObservabilityCensusConstants.GRPC_CLIENT_SENT_COMPRESSED_MESSAGE_BYTES_PER_RPC_VIEW);
viewManager.registerView( viewManager.registerView(