core: record RPC upstarts to Census. (#3708)

RPC upstarts are counted into metrics
RPC_{CLIENT,SERVER}_STARTED_COUNT. In addition, RPC completions are
counted into metrics RPC_{CLIENT,SERVER}_FINISHED_COUNT.  From these
metrics, users will be able to derive count of RPCs that are currently
active.
This commit is contained in:
Kun Zhang 2017-11-10 17:22:22 -08:00 committed by GitHub
parent bd32d6f599
commit ca9a41a46e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 324 additions and 128 deletions

View File

@ -71,7 +71,8 @@ public final class InProcessChannelBuilder extends
this.name = Preconditions.checkNotNull(name, "name");
// In-process transport should not record its traffic to the stats module.
// https://github.com/grpc/grpc-java/issues/2284
setRecordStats(false);
setStatsRecordStartedRpcs(false);
setStatsRecordFinishedRpcs(false);
}
@Override

View File

@ -81,7 +81,8 @@ public final class InProcessServerBuilder
this.name = Preconditions.checkNotNull(name, "name");
// In-process transport should not record its traffic to the stats module.
// https://github.com/grpc/grpc-java/issues/2284
setRecordStats(false);
setStatsRecordStartedRpcs(false);
setStatsRecordFinishedRpcs(false);
}
@Override

View File

@ -144,7 +144,8 @@ public abstract class AbstractManagedChannelImplBuilder
}
private boolean statsEnabled = true;
private boolean recordStats = true;
private boolean recordStartedRpcs = true;
private boolean recordFinishedRpcs = true;
private boolean tracingEnabled = true;
@Nullable
@ -296,11 +297,19 @@ public abstract class AbstractManagedChannelImplBuilder
}
/**
* Disable or enable stats recording. Effective only if {@link #setStatsEnabled} is set to true.
* Enabled by default.
* Disable or enable stats recording for RPC upstarts. Effective only if {@link
* #setStatsEnabled} is set to true. Enabled by default.
*/
protected void setRecordStats(boolean value) {
recordStats = value;
protected void setStatsRecordStartedRpcs(boolean value) {
recordStartedRpcs = value;
}
/**
* Disable or enable stats recording for RPC completions. Effective only if {@link
* #setStatsEnabled} is set to true. Enabled by default.
*/
protected void setStatsRecordFinishedRpcs(boolean value) {
recordFinishedRpcs = value;
}
/**
@ -348,7 +357,8 @@ public abstract class AbstractManagedChannelImplBuilder
}
// First interceptor runs last (see ClientInterceptors.intercept()), so that no
// other interceptor can override the tracer factory we set in CallOptions.
effectiveInterceptors.add(0, censusStats.getClientInterceptor(recordStats));
effectiveInterceptors.add(
0, censusStats.getClientInterceptor(recordStartedRpcs, recordFinishedRpcs));
}
if (tracingEnabled) {
CensusTracingModule censusTracing =

View File

@ -103,7 +103,8 @@ public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuil
private CensusStatsModule censusStatsOverride;
private boolean statsEnabled = true;
private boolean recordStats = true;
private boolean recordStartedRpcs = true;
private boolean recordFinishedRpcs = true;
private boolean tracingEnabled = true;
@Override
@ -207,11 +208,19 @@ public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuil
}
/**
* Disable or enable stats recording. Effective only if {@link #setStatsEnabled} is set to true.
* Enabled by default.
* Disable or enable stats recording for RPC upstarts. Effective only if {@link
* #setStatsEnabled} is set to true. Enabled by default.
*/
protected void setRecordStats(boolean value) {
recordStats = value;
protected void setStatsRecordStartedRpcs(boolean value) {
recordStartedRpcs = value;
}
/**
* Disable or enable stats recording for RPC completions. Effective only if {@link
* #setStatsEnabled} is set to true. Enabled by default.
*/
protected void setStatsRecordFinishedRpcs(boolean value) {
recordFinishedRpcs = value;
}
/**
@ -242,7 +251,8 @@ public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuil
if (censusStats == null) {
censusStats = new CensusStatsModule(GrpcUtil.STOPWATCH_SUPPLIER, true);
}
tracerFactories.add(censusStats.getServerTracerFactory(recordStats));
tracerFactories.add(
censusStats.getServerTracerFactory(recordStartedRpcs, recordFinishedRpcs));
}
if (tracingEnabled) {
CensusTracingModule censusTracing =

View File

@ -16,7 +16,6 @@
package io.grpc.internal;
import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static io.opencensus.tags.unsafe.ContextUtils.TAG_CONTEXT_KEY;
@ -53,7 +52,6 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
* Provides factories for {@link StreamTracer} that records stats to Census.
@ -133,22 +131,25 @@ public final class CensusStatsModule {
*/
@VisibleForTesting
ClientCallTracer newClientCallTracer(
TagContext parentCtx, String fullMethodName, boolean recordStats) {
return new ClientCallTracer(this, parentCtx, fullMethodName, recordStats);
TagContext parentCtx, String fullMethodName,
boolean recordStartedRpcs, boolean recordFinishedRpcs) {
return new ClientCallTracer(
this, parentCtx, fullMethodName, recordStartedRpcs, recordFinishedRpcs);
}
/**
* Returns the server tracer factory.
*/
ServerStreamTracer.Factory getServerTracerFactory(boolean recordStats) {
return new ServerTracerFactory(recordStats);
ServerStreamTracer.Factory getServerTracerFactory(
boolean recordStartedRpcs, boolean recordFinishedRpcs) {
return new ServerTracerFactory(recordStartedRpcs, recordFinishedRpcs);
}
/**
* Returns the client interceptor that facilitates Census-based stats reporting.
*/
ClientInterceptor getClientInterceptor(boolean recordStats) {
return new StatsClientInterceptor(recordStats);
ClientInterceptor getClientInterceptor(boolean recordStartedRpcs, boolean recordFinishedRpcs) {
return new StatsClientInterceptor(recordStartedRpcs, recordFinishedRpcs);
}
private static final class ClientTracer extends ClientStreamTracer {
@ -221,18 +222,27 @@ public final class CensusStatsModule {
private volatile ClientTracer streamTracer;
private volatile int callEnded;
private final TagContext parentCtx;
private final boolean recordStats;
private final TagContext startCtx;
private final boolean recordFinishedRpcs;
ClientCallTracer(
CensusStatsModule module,
TagContext parentCtx,
String fullMethodName,
boolean recordStats) {
boolean recordStartedRpcs,
boolean recordFinishedRpcs) {
this.module = module;
this.parentCtx = checkNotNull(parentCtx, "parentCtx");
this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
this.parentCtx = checkNotNull(parentCtx);
this.startCtx =
module.tagger.toBuilder(parentCtx)
.put(RpcMeasureConstants.RPC_METHOD, TagValue.create(fullMethodName)).build();
this.stopwatch = module.stopwatchSupplier.get().start();
this.recordStats = recordStats;
this.recordFinishedRpcs = recordFinishedRpcs;
if (recordStartedRpcs) {
module.statsRecorder.newMeasureMap().put(RpcMeasureConstants.RPC_CLIENT_STARTED_COUNT, 1)
.record(startCtx);
}
}
@Override
@ -262,7 +272,7 @@ public final class CensusStatsModule {
if (callEndedUpdater.getAndSet(this, 1) != 0) {
return;
}
if (!recordStats) {
if (!recordFinishedRpcs) {
return;
}
stopwatch.stop();
@ -272,7 +282,8 @@ public final class CensusStatsModule {
tracer = BLANK_CLIENT_TRACER;
}
MeasureMap measureMap = module.statsRecorder.newMeasureMap()
// The metrics are in double
.put(RpcMeasureConstants.RPC_CLIENT_FINISHED_COUNT, 1)
// The latency is double value
.put(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY, roundtripNanos / NANOS_PER_MILLI)
.put(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT, tracer.outboundMessageCount)
.put(RpcMeasureConstants.RPC_CLIENT_RESPONSE_COUNT, tracer.inboundMessageCount)
@ -290,8 +301,7 @@ public final class CensusStatsModule {
measureMap.record(
module
.tagger
.toBuilder(parentCtx)
.put(RpcMeasureConstants.RPC_METHOD, TagValue.create(fullMethodName))
.toBuilder(startCtx)
.put(RpcMeasureConstants.RPC_STATUS, TagValue.create(status.getCode().toString()))
.build());
}
@ -315,12 +325,11 @@ public final class CensusStatsModule {
private final CensusStatsModule module;
private final String fullMethodName;
@Nullable
private final TagContext parentCtx;
private volatile int streamClosed;
private final Stopwatch stopwatch;
private final Tagger tagger;
private final boolean recordStats;
private final boolean recordFinishedRpcs;
private volatile long outboundMessageCount;
private volatile long inboundMessageCount;
private volatile long outboundWireSize;
@ -334,13 +343,18 @@ public final class CensusStatsModule {
TagContext parentCtx,
Supplier<Stopwatch> stopwatchSupplier,
Tagger tagger,
boolean recordStats) {
boolean recordStartedRpcs,
boolean recordFinishedRpcs) {
this.module = module;
this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
this.parentCtx = checkNotNull(parentCtx, "parentCtx");
this.stopwatch = stopwatchSupplier.get().start();
this.tagger = tagger;
this.recordStats = recordStats;
this.recordFinishedRpcs = recordFinishedRpcs;
if (recordStartedRpcs) {
module.statsRecorder.newMeasureMap().put(RpcMeasureConstants.RPC_SERVER_STARTED_COUNT, 1)
.record(parentCtx);
}
}
@Override
@ -384,13 +398,14 @@ public final class CensusStatsModule {
if (streamClosedUpdater.getAndSet(this, 1) != 0) {
return;
}
if (!recordStats) {
if (!recordFinishedRpcs) {
return;
}
stopwatch.stop();
long elapsedTimeNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
MeasureMap measureMap = module.statsRecorder.newMeasureMap()
// The metrics are in double
.put(RpcMeasureConstants.RPC_SERVER_FINISHED_COUNT, 1)
// The latency is double value
.put(RpcMeasureConstants.RPC_SERVER_SERVER_LATENCY, elapsedTimeNanos / NANOS_PER_MILLI)
.put(RpcMeasureConstants.RPC_SERVER_RESPONSE_COUNT, outboundMessageCount)
.put(RpcMeasureConstants.RPC_SERVER_REQUEST_COUNT, inboundMessageCount)
@ -401,11 +416,10 @@ public final class CensusStatsModule {
if (!status.isOk()) {
measureMap.put(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT, 1);
}
TagContext ctx = firstNonNull(parentCtx, tagger.empty());
measureMap.record(
module
.tagger
.toBuilder(ctx)
.toBuilder(parentCtx)
.put(RpcMeasureConstants.RPC_STATUS, TagValue.create(status.getCode().toString()))
.build());
}
@ -421,10 +435,12 @@ public final class CensusStatsModule {
@VisibleForTesting
final class ServerTracerFactory extends ServerStreamTracer.Factory {
private final boolean recordStats;
private final boolean recordStartedRpcs;
private final boolean recordFinishedRpcs;
ServerTracerFactory(boolean recordStats) {
this.recordStats = recordStats;
ServerTracerFactory(boolean recordStartedRpcs, boolean recordFinishedRpcs) {
this.recordStartedRpcs = recordStartedRpcs;
this.recordFinishedRpcs = recordFinishedRpcs;
}
@Override
@ -444,16 +460,19 @@ public final class CensusStatsModule {
parentCtx,
stopwatchSupplier,
tagger,
recordStats);
recordStartedRpcs,
recordFinishedRpcs);
}
}
@VisibleForTesting
final class StatsClientInterceptor implements ClientInterceptor {
private final boolean recordStats;
private final boolean recordStartedRpcs;
private final boolean recordFinishedRpcs;
StatsClientInterceptor(boolean recordStats) {
this.recordStats = recordStats;
StatsClientInterceptor(boolean recordStartedRpcs, boolean recordFinishedRpcs) {
this.recordStartedRpcs = recordStartedRpcs;
this.recordFinishedRpcs = recordFinishedRpcs;
}
@Override
@ -462,7 +481,8 @@ public final class CensusStatsModule {
// New RPCs on client-side inherit the tag context from the current Context.
TagContext parentCtx = tagger.getCurrentTagContext();
final ClientCallTracer tracerFactory =
newClientCallTracer(parentCtx, method.getFullMethodName(), recordStats);
newClientCallTracer(parentCtx, method.getFullMethodName(),
recordStartedRpcs, recordFinishedRpcs);
ClientCall<ReqT, RespT> call =
next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
return new SimpleForwardingClientCall<ReqT, RespT>(call) {

View File

@ -240,7 +240,7 @@ public class CensusModulesTest {
Channel interceptedChannel =
ClientInterceptors.intercept(
grpcServerRule.getChannel(), callOptionsCaptureInterceptor,
censusStats.getClientInterceptor(true), censusTracing.getClientInterceptor());
censusStats.getClientInterceptor(true, true), censusTracing.getClientInterceptor());
ClientCall<String, String> call;
if (nonDefaultContext) {
Context ctx =
@ -275,7 +275,20 @@ public class CensusModulesTest {
// Make the call
Metadata headers = new Metadata();
call.start(mockClientCallListener, headers);
assertNull(statsRecorder.pollRecord());
StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord();
assertNotNull(record);
TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD);
assertEquals(method.getFullMethodName(), methodTag.asString());
if (nonDefaultContext) {
TagValue extraTag = record.tags.get(StatsTestUtils.EXTRA_TAG);
assertEquals("extra value", extraTag.asString());
assertEquals(2, record.tags.size());
} else {
assertNull(record.tags.get(StatsTestUtils.EXTRA_TAG));
assertEquals(1, record.tags.size());
}
if (nonDefaultContext) {
verify(tracer).spanBuilderWithExplicitParent(
eq("Sent.package1.service2.method3"), same(fakeClientParentSpan));
@ -297,9 +310,9 @@ public class CensusModulesTest {
assertEquals("No you don't", status.getDescription());
// The intercepting listener calls callEnded() on ClientCallTracer, which records to Census.
StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord();
record = statsRecorder.pollRecord();
assertNotNull(record);
TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD);
methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD);
assertEquals(method.getFullMethodName(), methodTag.asString());
TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS);
assertEquals(Status.Code.PERMISSION_DENIED.toString(), statusTag.asString());
@ -320,12 +333,44 @@ public class CensusModulesTest {
}
@Test
public void clientBasicStatsDefaultContext() {
public void clientBasicStatsDefaultContext_startsAndFinishes() {
subtestClientBasicStatsDefaultContext(true, true);
}
@Test
public void clientBasicStatsDefaultContext_startsOnly() {
subtestClientBasicStatsDefaultContext(true, false);
}
@Test
public void clientBasicStatsDefaultContext_finishesOnly() {
subtestClientBasicStatsDefaultContext(false, true);
}
@Test
public void clientBasicStatsDefaultContext_neither() {
subtestClientBasicStatsDefaultContext(false, true);
}
private void subtestClientBasicStatsDefaultContext(boolean recordStarts, boolean recordFinishes) {
CensusStatsModule.ClientCallTracer callTracer =
censusStats.newClientCallTracer(tagger.empty(), method.getFullMethodName(), true);
censusStats.newClientCallTracer(
tagger.empty(), method.getFullMethodName(), recordStarts, recordFinishes);
Metadata headers = new Metadata();
ClientStreamTracer tracer = callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers);
if (recordStarts) {
StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord();
assertNotNull(record);
assertNoServerContent(record);
assertEquals(1, record.tags.size());
TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD);
assertEquals(method.getFullMethodName(), methodTag.asString());
assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_STARTED_COUNT));
} else {
assertNull(statsRecorder.pollRecord());
}
fakeClock.forwardTime(30, MILLISECONDS);
tracer.outboundHeaders();
@ -349,27 +394,32 @@ public class CensusModulesTest {
tracer.streamClosed(Status.OK);
callTracer.callEnded(Status.OK);
StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord();
assertNotNull(record);
assertNoServerContent(record);
TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD);
assertEquals(method.getFullMethodName(), methodTag.asString());
TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS);
assertEquals(Status.Code.OK.toString(), statusTag.asString());
assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_ERROR_COUNT));
assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT));
assertEquals(
1028 + 99, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_BYTES));
assertEquals(
1128 + 865,
record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_COUNT));
assertEquals(
33 + 154, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_BYTES));
assertEquals(67 + 552,
record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
assertEquals(30 + 100 + 16 + 24,
record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
if (recordFinishes) {
StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord();
assertNotNull(record);
assertNoServerContent(record);
TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD);
assertEquals(method.getFullMethodName(), methodTag.asString());
TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS);
assertEquals(Status.Code.OK.toString(), statusTag.asString());
assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_FINISHED_COUNT));
assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_ERROR_COUNT));
assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT));
assertEquals(
1028 + 99, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_BYTES));
assertEquals(
1128 + 865,
record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_COUNT));
assertEquals(
33 + 154, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_BYTES));
assertEquals(67 + 552,
record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
assertEquals(30 + 100 + 16 + 24,
record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
} else {
assertNull(statsRecorder.pollRecord());
}
}
@Test
@ -433,18 +483,29 @@ public class CensusModulesTest {
public void clientStreamNeverCreatedStillRecordStats() {
CensusStatsModule.ClientCallTracer callTracer =
censusStats.newClientCallTracer(
tagger.empty(), method.getFullMethodName(), true);
tagger.empty(), method.getFullMethodName(), true, true);
fakeClock.forwardTime(3000, MILLISECONDS);
callTracer.callEnded(Status.DEADLINE_EXCEEDED.withDescription("3 seconds"));
// Upstart record
StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord();
assertNotNull(record);
assertNoServerContent(record);
assertEquals(1, record.tags.size());
TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD);
assertEquals(method.getFullMethodName(), methodTag.asString());
assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_STARTED_COUNT));
// Completion record
record = statsRecorder.pollRecord();
assertNotNull(record);
assertNoServerContent(record);
methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD);
assertEquals(method.getFullMethodName(), methodTag.asString());
TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS);
assertEquals(Status.Code.DEADLINE_EXCEEDED.toString(), statusTag.asString());
assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_FINISHED_COUNT));
assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_ERROR_COUNT));
assertEquals(0, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT));
assertEquals(0, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_BYTES));
@ -513,9 +574,21 @@ public class CensusModulesTest {
propagate);
Metadata headers = new Metadata();
CensusStatsModule.ClientCallTracer callTracer =
census.newClientCallTracer(clientCtx, method.getFullMethodName(), recordStats);
census.newClientCallTracer(clientCtx, method.getFullMethodName(), recordStats, recordStats);
// This propagates clientCtx to headers if propagates==true
callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers);
if (recordStats) {
// Client upstart record
StatsTestUtils.MetricsRecord clientRecord = statsRecorder.pollRecord();
assertNotNull(clientRecord);
assertNoServerContent(clientRecord);
assertEquals(2, clientRecord.tags.size());
TagValue clientMethodTag = clientRecord.tags.get(RpcMeasureConstants.RPC_METHOD);
assertEquals(method.getFullMethodName(), clientMethodTag.asString());
TagValue clientPropagatedTag = clientRecord.tags.get(StatsTestUtils.EXTRA_TAG);
assertEquals("extra-tag-value-897", clientPropagatedTag.asString());
}
if (propagate) {
assertTrue(headers.containsKey(census.statsHeader));
} else {
@ -524,7 +597,7 @@ public class CensusModulesTest {
}
ServerStreamTracer serverTracer =
census.getServerTracerFactory(recordStats).newServerStreamTracer(
census.getServerTracerFactory(recordStats, recordStats).newServerStreamTracer(
method.getFullMethodName(), headers);
// Server tracer deserializes clientCtx from the headers, so that it records stats with the
// propagated tags.
@ -540,15 +613,26 @@ public class CensusModulesTest {
serverTracer.streamClosed(Status.OK);
if (recordStats) {
// Server upstart record
StatsTestUtils.MetricsRecord serverRecord = statsRecorder.pollRecord();
assertNotNull(serverRecord);
assertNoClientContent(serverRecord);
assertEquals(2, serverRecord.tags.size());
TagValue serverMethodTag = serverRecord.tags.get(RpcMeasureConstants.RPC_METHOD);
assertEquals(method.getFullMethodName(), serverMethodTag.asString());
TagValue serverPropagatedTag = serverRecord.tags.get(StatsTestUtils.EXTRA_TAG);
assertEquals("extra-tag-value-897", serverPropagatedTag.asString());
// Server completion record
serverRecord = statsRecorder.pollRecord();
assertNotNull(serverRecord);
assertNoClientContent(serverRecord);
serverMethodTag = serverRecord.tags.get(RpcMeasureConstants.RPC_METHOD);
assertEquals(method.getFullMethodName(), serverMethodTag.asString());
TagValue serverStatusTag = serverRecord.tags.get(RpcMeasureConstants.RPC_STATUS);
assertEquals(Status.Code.OK.toString(), serverStatusTag.asString());
assertNull(serverRecord.getMetric(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT));
TagValue serverPropagatedTag = serverRecord.tags.get(StatsTestUtils.EXTRA_TAG);
serverPropagatedTag = serverRecord.tags.get(StatsTestUtils.EXTRA_TAG);
assertEquals("extra-tag-value-897", serverPropagatedTag.asString());
}
@ -557,6 +641,7 @@ public class CensusModulesTest {
callTracer.callEnded(Status.OK);
if (recordStats) {
// Client completion record
StatsTestUtils.MetricsRecord clientRecord = statsRecorder.pollRecord();
assertNotNull(clientRecord);
assertNoServerContent(clientRecord);
@ -577,7 +662,7 @@ public class CensusModulesTest {
@Test
public void statsHeadersNotPropagateDefaultContext() {
CensusStatsModule.ClientCallTracer callTracer =
censusStats.newClientCallTracer(tagger.empty(), method.getFullMethodName(), true);
censusStats.newClientCallTracer(tagger.empty(), method.getFullMethodName(), false, false);
Metadata headers = new Metadata();
callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers);
assertFalse(headers.containsKey(censusStats.statsHeader));
@ -657,11 +742,43 @@ public class CensusModulesTest {
}
@Test
public void serverBasicStatsNoHeaders() {
ServerStreamTracer.Factory tracerFactory = censusStats.getServerTracerFactory(true);
public void serverBasicStatsNoHeaders_startsAndFinishes() {
subtestServerBasicStatsNoHeaders(true, true);
}
@Test
public void serverBasicStatsNoHeaders_startsOnly() {
subtestServerBasicStatsNoHeaders(true, false);
}
@Test
public void serverBasicStatsNoHeaders_finishesOnly() {
subtestServerBasicStatsNoHeaders(false, true);
}
@Test
public void serverBasicStatsNoHeaders_neither() {
subtestServerBasicStatsNoHeaders(false, false);
}
private void subtestServerBasicStatsNoHeaders(boolean recordStarts, boolean recordFinishes) {
ServerStreamTracer.Factory tracerFactory =
censusStats.getServerTracerFactory(recordStarts, recordFinishes);
ServerStreamTracer tracer =
tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata());
if (recordStarts) {
StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord();
assertNotNull(record);
assertNoClientContent(record);
assertEquals(1, record.tags.size());
TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD);
assertEquals(method.getFullMethodName(), methodTag.asString());
assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_STARTED_COUNT));
} else {
assertNull(statsRecorder.pollRecord());
}
Context filteredContext = tracer.filterContext(Context.ROOT);
TagContext statsCtx = TAG_CONTEXT_KEY.get(filteredContext);
assertEquals(
@ -694,27 +811,32 @@ public class CensusModulesTest {
tracer.streamClosed(Status.CANCELLED);
StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord();
assertNotNull(record);
assertNoClientContent(record);
TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD);
assertEquals(method.getFullMethodName(), methodTag.asString());
TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS);
assertEquals(Status.Code.CANCELLED.toString(), statusTag.asString());
assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT));
assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_RESPONSE_COUNT));
assertEquals(
1028 + 99, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_RESPONSE_BYTES));
assertEquals(
1128 + 865,
record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES));
assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_REQUEST_COUNT));
assertEquals(
34 + 154, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_REQUEST_BYTES));
assertEquals(67 + 552,
record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES));
assertEquals(100 + 16 + 24,
record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_SERVER_LATENCY));
if (recordFinishes) {
StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord();
assertNotNull(record);
assertNoClientContent(record);
TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD);
assertEquals(method.getFullMethodName(), methodTag.asString());
TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS);
assertEquals(Status.Code.CANCELLED.toString(), statusTag.asString());
assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_FINISHED_COUNT));
assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT));
assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_RESPONSE_COUNT));
assertEquals(
1028 + 99, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_RESPONSE_BYTES));
assertEquals(
1128 + 865,
record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES));
assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_REQUEST_COUNT));
assertEquals(
34 + 154, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_REQUEST_BYTES));
assertEquals(67 + 552,
record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES));
assertEquals(100 + 16 + 24,
record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_SERVER_LATENCY));
} else {
assertNull(statsRecorder.pollRecord());
}
}
@Test

View File

@ -719,12 +719,15 @@ public abstract class AbstractInteropTest {
Status.fromThrowable(responseObserver.getError()).getCode());
if (metricsExpected()) {
MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
checkStartTags(clientStartRecord, "grpc.testing.TestService/StreamingInputCall");
// CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be
// recorded. The tracer stats rely on the stream being created, which is not always the case
// in this test. Therefore we don't check the tracer stats.
MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
checkTags(
clientRecord, "grpc.testing.TestService/StreamingInputCall", Status.CANCELLED.getCode());
MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
checkEndTags(
clientEndRecord, "grpc.testing.TestService/StreamingInputCall",
Status.CANCELLED.getCode());
// Do not check server-side metrics, because the status on the server side is undetermined.
}
}
@ -1044,9 +1047,11 @@ public abstract class AbstractInteropTest {
if (metricsExpected()) {
// Stream may not have been created before deadline is exceeded, thus we don't test the tracer
// stats.
MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
checkTags(
clientRecord,
MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
checkStartTags(clientStartRecord, "grpc.testing.TestService/StreamingOutputCall");
MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
checkEndTags(
clientEndRecord,
"grpc.testing.TestService/StreamingOutputCall",
Status.Code.DEADLINE_EXCEEDED);
// Do not check server-side metrics, because the status on the server side is undetermined.
@ -1078,9 +1083,11 @@ public abstract class AbstractInteropTest {
if (metricsExpected()) {
// Stream may not have been created when deadline is exceeded, thus we don't check tracer
// stats.
MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
checkTags(
clientRecord,
MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
checkStartTags(clientStartRecord, "grpc.testing.TestService/StreamingOutputCall");
MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
checkEndTags(
clientEndRecord,
"grpc.testing.TestService/StreamingOutputCall",
Status.Code.DEADLINE_EXCEEDED);
// Do not check server-side metrics, because the status on the server side is undetermined.
@ -1103,9 +1110,12 @@ public abstract class AbstractInteropTest {
// recorded. The tracer stats rely on the stream being created, which is not the case if
// deadline is exceeded before the call is created. Therefore we don't check the tracer stats.
if (metricsExpected()) {
MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
checkTags(
clientRecord, "grpc.testing.TestService/EmptyCall", Status.DEADLINE_EXCEEDED.getCode());
MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
checkStartTags(clientStartRecord, "grpc.testing.TestService/EmptyCall");
MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
checkEndTags(
clientEndRecord, "grpc.testing.TestService/EmptyCall",
Status.DEADLINE_EXCEEDED.getCode());
}
// warm up the channel
@ -1120,9 +1130,12 @@ public abstract class AbstractInteropTest {
}
assertStatsTrace("grpc.testing.TestService/EmptyCall", Status.Code.OK);
if (metricsExpected()) {
MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
checkTags(
clientRecord, "grpc.testing.TestService/EmptyCall", Status.DEADLINE_EXCEEDED.getCode());
MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
checkStartTags(clientStartRecord, "grpc.testing.TestService/EmptyCall");
MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
checkEndTags(
clientEndRecord, "grpc.testing.TestService/EmptyCall",
Status.DEADLINE_EXCEEDED.getCode());
}
}
@ -1540,9 +1553,11 @@ public abstract class AbstractInteropTest {
// CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be
// recorded. The tracer stats rely on the stream being created, which is not always the case
// in this test, thus we will not check that.
MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
checkTags(
clientRecord,
MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
checkStartTags(clientStartRecord, "grpc.testing.TestService/FullDuplexCall");
MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
checkEndTags(
clientEndRecord,
"grpc.testing.TestService/FullDuplexCall",
Status.DEADLINE_EXCEEDED.getCode());
}
@ -1803,11 +1818,13 @@ public abstract class AbstractInteropTest {
if (metricsExpected()) {
// CensusStreamTracerModule records final status in interceptor, which is guaranteed to be
// done before application receives status.
MetricsRecord clientRecord = clientStatsRecorder.pollRecord();
checkTags(clientRecord, method, code);
MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord();
checkStartTags(clientStartRecord, method);
MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord();
checkEndTags(clientEndRecord, method, code);
if (requests != null && responses != null) {
checkCensus(clientRecord, false, requests, responses);
checkCensus(clientEndRecord, false, requests, responses);
}
}
}
@ -1831,21 +1848,24 @@ public abstract class AbstractInteropTest {
// tests. The best we can do here is to exhaust all records and find one that matches the
// given conditions.
while (true) {
MetricsRecord serverRecord;
MetricsRecord serverStartRecord;
MetricsRecord serverEndRecord;
try {
// On the server, the stats is finalized in ServerStreamListener.closed(), which can be
// run after the client receives the final status. So we use a timeout.
serverRecord = serverStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
serverStartRecord = serverStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
serverEndRecord = serverStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (serverRecord == null) {
if (serverEndRecord == null) {
break;
}
try {
checkTags(serverRecord, method, code);
checkStartTags(serverStartRecord, method);
checkEndTags(serverEndRecord, method, code);
if (requests != null && responses != null) {
checkCensus(serverRecord, true, requests, responses);
checkCensus(serverEndRecord, true, requests, responses);
}
passed = true;
break;
@ -1899,7 +1919,14 @@ public abstract class AbstractInteropTest {
}
}
private static void checkTags(
private static void checkStartTags(MetricsRecord record, String methodName) {
assertNotNull("record is not null", record);
TagValue methodNameTag = record.tags.get(RpcMeasureConstants.RPC_METHOD);
assertNotNull("method name tagged", methodNameTag);
assertEquals("method names match", methodName, methodNameTag.asString());
}
private static void checkEndTags(
MetricsRecord record, String methodName, Status.Code status) {
assertNotNull("record is not null", record);
TagValue methodNameTag = record.tags.get(RpcMeasureConstants.RPC_METHOD);

View File

@ -104,6 +104,11 @@ public class StatsTestUtils {
}
return longValue;
}
@Override
public String toString() {
return "[tags=" + tags + ", metrics=" + metrics + "]";
}
}
/**