core: record real-time metrics to OpenCensus (updated to 1.18.0) (#5099)

Real-time metrics are total sent/received bytes and messages per
method, and are updated as the events occur rather than at the end of
RPCs.
This commit is contained in:
Kun Zhang 2018-11-29 16:30:58 -08:00 committed by GitHub
parent 2961857451
commit b5acbedd55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 205 additions and 40 deletions

View File

@ -110,7 +110,7 @@ subprojects {
protobufVersion = '3.5.1' protobufVersion = '3.5.1'
protocVersion = '3.5.1-1' protocVersion = '3.5.1-1'
protobufNanoVersion = '3.0.0-alpha-5' protobufNanoVersion = '3.0.0-alpha-5'
opencensusVersion = '0.17.0' opencensusVersion = '0.18.0'
configureProtoCompilation = { configureProtoCompilation = {
String generatedSourcePath = "${projectDir}/src/generated" String generatedSourcePath = "${projectDir}/src/generated"

View File

@ -165,6 +165,7 @@ public abstract class AbstractManagedChannelImplBuilder
private boolean statsEnabled = true; private boolean statsEnabled = true;
private boolean recordStartedRpcs = true; private boolean recordStartedRpcs = true;
private boolean recordFinishedRpcs = true; private boolean recordFinishedRpcs = true;
private boolean recordRealTimeMetrics = false;
private boolean tracingEnabled = true; private boolean tracingEnabled = true;
@Nullable @Nullable
@ -383,6 +384,14 @@ public abstract class AbstractManagedChannelImplBuilder
recordFinishedRpcs = value; recordFinishedRpcs = value;
} }
/**
* Disable or enable real-time metrics recording. Effective only if {@link #setStatsEnabled} is
* set to true. Disabled by default.
*/
protected void setStatsRecordRealTimeMetrics(boolean value) {
recordRealTimeMetrics = value;
}
/** /**
* Disable or enable tracing features. Enabled by default. * Disable or enable tracing features. Enabled by default.
* *
@ -433,7 +442,8 @@ public abstract class AbstractManagedChannelImplBuilder
CensusStatsModule censusStats = this.censusStatsOverride; CensusStatsModule censusStats = this.censusStatsOverride;
if (censusStats == null) { if (censusStats == null) {
censusStats = new CensusStatsModule( censusStats = new CensusStatsModule(
GrpcUtil.STOPWATCH_SUPPLIER, true, recordStartedRpcs, recordFinishedRpcs); GrpcUtil.STOPWATCH_SUPPLIER, true, recordStartedRpcs, recordFinishedRpcs,
recordRealTimeMetrics);
} }
// First interceptor runs last (see ClientInterceptors.intercept()), so that no // First interceptor runs last (see ClientInterceptors.intercept()), so that no
// other interceptor can override the tracer factory we set in CallOptions. // other interceptor can override the tracer factory we set in CallOptions.

View File

@ -108,6 +108,7 @@ public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuil
private boolean statsEnabled = true; private boolean statsEnabled = true;
private boolean recordStartedRpcs = true; private boolean recordStartedRpcs = true;
private boolean recordFinishedRpcs = true; private boolean recordFinishedRpcs = true;
private boolean recordRealTimeMetrics = false;
private boolean tracingEnabled = true; private boolean tracingEnabled = true;
@Nullable @Nullable
@ -239,6 +240,14 @@ public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuil
recordFinishedRpcs = value; recordFinishedRpcs = value;
} }
/**
* Disable or enable real-time metrics recording. Effective only if {@link #setStatsEnabled} is
* set to true. Disabled by default.
*/
protected void setStatsRecordRealTimeMetrics(boolean value) {
recordRealTimeMetrics = value;
}
/** /**
* Disable or enable tracing features. Enabled by default. * Disable or enable tracing features. Enabled by default.
*/ */
@ -266,7 +275,8 @@ public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuil
CensusStatsModule censusStats = this.censusStatsOverride; CensusStatsModule censusStats = this.censusStatsOverride;
if (censusStats == null) { if (censusStats == null) {
censusStats = new CensusStatsModule( censusStats = new CensusStatsModule(
GrpcUtil.STOPWATCH_SUPPLIER, true, recordStartedRpcs, recordFinishedRpcs); GrpcUtil.STOPWATCH_SUPPLIER, true, recordStartedRpcs, recordFinishedRpcs,
recordRealTimeMetrics);
} }
tracerFactories.add(censusStats.getServerTracerFactory()); tracerFactories.add(censusStats.getServerTracerFactory());
} }

View File

@ -36,6 +36,9 @@ import io.grpc.MethodDescriptor;
import io.grpc.ServerStreamTracer; import io.grpc.ServerStreamTracer;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.StreamTracer; import io.grpc.StreamTracer;
import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants;
import io.opencensus.stats.Measure.MeasureDouble;
import io.opencensus.stats.Measure.MeasureLong;
import io.opencensus.stats.MeasureMap; import io.opencensus.stats.MeasureMap;
import io.opencensus.stats.Stats; import io.opencensus.stats.Stats;
import io.opencensus.stats.StatsRecorder; import io.opencensus.stats.StatsRecorder;
@ -67,7 +70,6 @@ import javax.annotation.Nullable;
public final class CensusStatsModule { public final class CensusStatsModule {
private static final Logger logger = Logger.getLogger(CensusStatsModule.class.getName()); private static final Logger logger = Logger.getLogger(CensusStatsModule.class.getName());
private static final double NANOS_PER_MILLI = TimeUnit.MILLISECONDS.toNanos(1); private static final double NANOS_PER_MILLI = TimeUnit.MILLISECONDS.toNanos(1);
private static final ClientTracer BLANK_CLIENT_TRACER = new ClientTracer();
private final Tagger tagger; private final Tagger tagger;
private final StatsRecorder statsRecorder; private final StatsRecorder statsRecorder;
@ -77,18 +79,20 @@ public final class CensusStatsModule {
private final boolean propagateTags; private final boolean propagateTags;
private final boolean recordStartedRpcs; private final boolean recordStartedRpcs;
private final boolean recordFinishedRpcs; private final boolean recordFinishedRpcs;
private final boolean recordRealTimeMetrics;
/** /**
* Creates a {@link CensusStatsModule} with the default OpenCensus implementation. * Creates a {@link CensusStatsModule} with the default OpenCensus implementation.
*/ */
CensusStatsModule(Supplier<Stopwatch> stopwatchSupplier, CensusStatsModule(Supplier<Stopwatch> stopwatchSupplier,
boolean propagateTags, boolean recordStartedRpcs, boolean recordFinishedRpcs) { boolean propagateTags, boolean recordStartedRpcs, boolean recordFinishedRpcs,
boolean recordRealTimeMetrics) {
this( this(
Tags.getTagger(), Tags.getTagger(),
Tags.getTagPropagationComponent().getBinarySerializer(), Tags.getTagPropagationComponent().getBinarySerializer(),
Stats.getStatsRecorder(), Stats.getStatsRecorder(),
stopwatchSupplier, stopwatchSupplier,
propagateTags, recordStartedRpcs, recordFinishedRpcs); propagateTags, recordStartedRpcs, recordFinishedRpcs, recordRealTimeMetrics);
} }
/** /**
@ -98,7 +102,8 @@ public final class CensusStatsModule {
final Tagger tagger, final Tagger tagger,
final TagContextBinarySerializer tagCtxSerializer, final TagContextBinarySerializer tagCtxSerializer,
StatsRecorder statsRecorder, Supplier<Stopwatch> stopwatchSupplier, StatsRecorder statsRecorder, Supplier<Stopwatch> stopwatchSupplier,
boolean propagateTags, boolean recordStartedRpcs, boolean recordFinishedRpcs) { boolean propagateTags, boolean recordStartedRpcs, boolean recordFinishedRpcs,
boolean recordRealTimeMetrics) {
this.tagger = checkNotNull(tagger, "tagger"); this.tagger = checkNotNull(tagger, "tagger");
this.statsRecorder = checkNotNull(statsRecorder, "statsRecorder"); this.statsRecorder = checkNotNull(statsRecorder, "statsRecorder");
checkNotNull(tagCtxSerializer, "tagCtxSerializer"); checkNotNull(tagCtxSerializer, "tagCtxSerializer");
@ -106,6 +111,7 @@ public final class CensusStatsModule {
this.propagateTags = propagateTags; this.propagateTags = propagateTags;
this.recordStartedRpcs = recordStartedRpcs; this.recordStartedRpcs = recordStartedRpcs;
this.recordFinishedRpcs = recordFinishedRpcs; this.recordFinishedRpcs = recordFinishedRpcs;
this.recordRealTimeMetrics = recordRealTimeMetrics;
this.statsHeader = this.statsHeader =
Metadata.Key.of("grpc-tags-bin", new Metadata.BinaryMarshaller<TagContext>() { Metadata.Key.of("grpc-tags-bin", new Metadata.BinaryMarshaller<TagContext>() {
@Override @Override
@ -154,6 +160,20 @@ public final class CensusStatsModule {
return new StatsClientInterceptor(); return new StatsClientInterceptor();
} }
private void recordRealTimeMetric(TagContext ctx, MeasureDouble measure, double value) {
if (recordRealTimeMetrics) {
MeasureMap measureMap = statsRecorder.newMeasureMap().put(measure, value);
measureMap.record(ctx);
}
}
private void recordRealTimeMetric(TagContext ctx, MeasureLong measure, long value) {
if (recordRealTimeMetrics) {
MeasureMap measureMap = statsRecorder.newMeasureMap().put(measure, value);
measureMap.record(ctx);
}
}
private static final class ClientTracer extends ClientStreamTracer { private static final class ClientTracer extends ClientStreamTracer {
@Nullable private static final AtomicLongFieldUpdater<ClientTracer> outboundMessageCountUpdater; @Nullable private static final AtomicLongFieldUpdater<ClientTracer> outboundMessageCountUpdater;
@ -209,6 +229,9 @@ public final class CensusStatsModule {
inboundUncompressedSizeUpdater = tmpInboundUncompressedSizeUpdater; inboundUncompressedSizeUpdater = tmpInboundUncompressedSizeUpdater;
} }
private final CensusStatsModule module;
private final TagContext startCtx;
volatile long outboundMessageCount; volatile long outboundMessageCount;
volatile long inboundMessageCount; volatile long inboundMessageCount;
volatile long outboundWireSize; volatile long outboundWireSize;
@ -216,6 +239,11 @@ public final class CensusStatsModule {
volatile long outboundUncompressedSize; volatile long outboundUncompressedSize;
volatile long inboundUncompressedSize; volatile long inboundUncompressedSize;
ClientTracer(CensusStatsModule module, TagContext startCtx) {
this.module = checkNotNull(module, "module");
this.startCtx = checkNotNull(startCtx, "startCtx");
}
@Override @Override
@SuppressWarnings("NonAtomicVolatileUpdate") @SuppressWarnings("NonAtomicVolatileUpdate")
public void outboundWireSize(long bytes) { public void outboundWireSize(long bytes) {
@ -224,6 +252,8 @@ public final class CensusStatsModule {
} else { } else {
outboundWireSize += bytes; outboundWireSize += bytes;
} }
module.recordRealTimeMetric(
startCtx, RpcMeasureConstants.GRPC_CLIENT_SENT_BYTES_PER_METHOD, bytes);
} }
@Override @Override
@ -234,6 +264,8 @@ public final class CensusStatsModule {
} else { } else {
inboundWireSize += bytes; inboundWireSize += bytes;
} }
module.recordRealTimeMetric(
startCtx, RpcMeasureConstants.GRPC_CLIENT_RECEIVED_BYTES_PER_METHOD, bytes);
} }
@Override @Override
@ -264,6 +296,8 @@ public final class CensusStatsModule {
} else { } else {
inboundMessageCount++; inboundMessageCount++;
} }
module.recordRealTimeMetric(
startCtx, RpcMeasureConstants.GRPC_CLIENT_RECEIVED_MESSAGES_PER_METHOD, 1);
} }
@Override @Override
@ -274,6 +308,8 @@ public final class CensusStatsModule {
} else { } else {
outboundMessageCount++; outboundMessageCount++;
} }
module.recordRealTimeMetric(
startCtx, RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1);
} }
} }
@ -332,7 +368,7 @@ public final class CensusStatsModule {
@Override @Override
public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) { public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) {
ClientTracer tracer = new ClientTracer(); ClientTracer tracer = new ClientTracer(module, startCtx);
// TODO(zhangkun83): Once retry or hedging is implemented, a ClientCall may start more than // TODO(zhangkun83): Once retry or hedging is implemented, a ClientCall may start more than
// one streams. We will need to update this file to support them. // one streams. We will need to update this file to support them.
if (streamTracerUpdater != null) { if (streamTracerUpdater != null) {
@ -378,7 +414,7 @@ public final class CensusStatsModule {
long roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS); long roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
ClientTracer tracer = streamTracer; ClientTracer tracer = streamTracer;
if (tracer == null) { if (tracer == null) {
tracer = BLANK_CLIENT_TRACER; tracer = new ClientTracer(module, startCtx);
} }
MeasureMap measureMap = module.statsRecorder.newMeasureMap() MeasureMap measureMap = module.statsRecorder.newMeasureMap()
// TODO(songya): remove the deprecated measure constants once they are completed removed. // TODO(songya): remove the deprecated measure constants once they are completed removed.
@ -502,6 +538,8 @@ public final class CensusStatsModule {
} else { } else {
outboundWireSize += bytes; outboundWireSize += bytes;
} }
module.recordRealTimeMetric(
parentCtx, RpcMeasureConstants.GRPC_SERVER_SENT_BYTES_PER_METHOD, bytes);
} }
@Override @Override
@ -512,6 +550,8 @@ public final class CensusStatsModule {
} else { } else {
inboundWireSize += bytes; inboundWireSize += bytes;
} }
module.recordRealTimeMetric(
parentCtx, RpcMeasureConstants.GRPC_SERVER_RECEIVED_BYTES_PER_METHOD, bytes);
} }
@Override @Override
@ -542,6 +582,8 @@ public final class CensusStatsModule {
} else { } else {
inboundMessageCount++; inboundMessageCount++;
} }
module.recordRealTimeMetric(
parentCtx, RpcMeasureConstants.GRPC_SERVER_RECEIVED_MESSAGES_PER_METHOD, 1);
} }
@Override @Override
@ -552,6 +594,8 @@ public final class CensusStatsModule {
} else { } else {
outboundMessageCount++; outboundMessageCount++;
} }
module.recordRealTimeMetric(
parentCtx, RpcMeasureConstants.GRPC_SERVER_SENT_MESSAGES_PER_METHOD, 1);
} }
/** /**

View File

@ -414,7 +414,7 @@ public class AbstractManagedChannelImplBuilderTest {
new FakeTagContextBinarySerializer(), new FakeTagContextBinarySerializer(),
new FakeStatsRecorder(), new FakeStatsRecorder(),
GrpcUtil.STOPWATCH_SUPPLIER, GrpcUtil.STOPWATCH_SUPPLIER,
true, true, true)); true, true, true, true));
} }
Builder(SocketAddress directServerAddress, String authority) { Builder(SocketAddress directServerAddress, String authority) {
@ -425,7 +425,7 @@ public class AbstractManagedChannelImplBuilderTest {
new FakeTagContextBinarySerializer(), new FakeTagContextBinarySerializer(),
new FakeStatsRecorder(), new FakeStatsRecorder(),
GrpcUtil.STOPWATCH_SUPPLIER, GrpcUtil.STOPWATCH_SUPPLIER,
true, true, true)); true, true, true, true));
} }
@Override @Override

View File

@ -91,7 +91,7 @@ public class AbstractServerImplBuilderTest {
new FakeTagContextBinarySerializer(), new FakeTagContextBinarySerializer(),
new FakeStatsRecorder(), new FakeStatsRecorder(),
GrpcUtil.STOPWATCH_SUPPLIER, GrpcUtil.STOPWATCH_SUPPLIER,
true, true, true)); true, true, true, true));
} }
@Override @Override

View File

@ -63,6 +63,8 @@ import io.grpc.internal.testing.StatsTestUtils.FakeTagContextBinarySerializer;
import io.grpc.internal.testing.StatsTestUtils.FakeTagger; import io.grpc.internal.testing.StatsTestUtils.FakeTagger;
import io.grpc.internal.testing.StatsTestUtils.MockableSpan; import io.grpc.internal.testing.StatsTestUtils.MockableSpan;
import io.grpc.testing.GrpcServerRule; import io.grpc.testing.GrpcServerRule;
import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants;
import io.opencensus.stats.Measure;
import io.opencensus.tags.TagContext; import io.opencensus.tags.TagContext;
import io.opencensus.tags.TagValue; import io.opencensus.tags.TagValue;
import io.opencensus.trace.BlankSpan; import io.opencensus.trace.BlankSpan;
@ -192,7 +194,7 @@ public class CensusModulesTest {
censusStats = censusStats =
new CensusStatsModule( new CensusStatsModule(
tagger, tagCtxSerializer, statsRecorder, fakeClock.getStopwatchSupplier(), tagger, tagCtxSerializer, statsRecorder, fakeClock.getStopwatchSupplier(),
true, true, true); true, true, true, false /* real-time */);
censusTracing = new CensusTracingModule(tracer, mockTracingPropagationHandler); censusTracing = new CensusTracingModule(tracer, mockTracingPropagationHandler);
} }
@ -334,30 +336,36 @@ public class CensusModulesTest {
} }
@Test @Test
public void clientBasicStatsDefaultContext_startsAndFinishes() { public void clientBasicStatsDefaultContext_starts_finishes_noRealTime() {
subtestClientBasicStatsDefaultContext(true, true); subtestClientBasicStatsDefaultContext(true, true, false);
} }
@Test @Test
public void clientBasicStatsDefaultContext_startsOnly() { public void clientBasicStatsDefaultContext_starts_noFinishes_noRealTime() {
subtestClientBasicStatsDefaultContext(true, false); subtestClientBasicStatsDefaultContext(true, false, false);
} }
@Test @Test
public void clientBasicStatsDefaultContext_finishesOnly() { public void clientBasicStatsDefaultContext_noStarts_finishes_noRealTime() {
subtestClientBasicStatsDefaultContext(false, true); subtestClientBasicStatsDefaultContext(false, true, false);
} }
@Test @Test
public void clientBasicStatsDefaultContext_neither() { public void clientBasicStatsDefaultContext_noStarts_noFinishes_noRealTime() {
subtestClientBasicStatsDefaultContext(false, true); subtestClientBasicStatsDefaultContext(false, false, false);
} }
private void subtestClientBasicStatsDefaultContext(boolean recordStarts, boolean recordFinishes) { @Test
public void clientBasicStatsDefaultContext_starts_finishes_realTime() {
subtestClientBasicStatsDefaultContext(true, true, true);
}
private void subtestClientBasicStatsDefaultContext(
boolean recordStarts, boolean recordFinishes, boolean recordRealTime) {
CensusStatsModule localCensusStats = CensusStatsModule localCensusStats =
new CensusStatsModule( new CensusStatsModule(
tagger, tagCtxSerializer, statsRecorder, fakeClock.getStopwatchSupplier(), tagger, tagCtxSerializer, statsRecorder, fakeClock.getStopwatchSupplier(),
true, recordStarts, recordFinishes); true, recordStarts, recordFinishes, recordRealTime);
CensusStatsModule.ClientCallTracer callTracer = CensusStatsModule.ClientCallTracer callTracer =
localCensusStats.newClientCallTracer( localCensusStats.newClientCallTracer(
tagger.empty(), method.getFullMethodName()); tagger.empty(), method.getFullMethodName());
@ -381,21 +389,48 @@ public class CensusModulesTest {
tracer.outboundHeaders(); tracer.outboundHeaders();
fakeClock.forwardTime(100, MILLISECONDS); fakeClock.forwardTime(100, MILLISECONDS);
tracer.outboundMessage(0); tracer.outboundMessage(0);
assertRealTimeMetric(
RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1, recordRealTime, true);
tracer.outboundWireSize(1028); tracer.outboundWireSize(1028);
assertRealTimeMetric(
RpcMeasureConstants.GRPC_CLIENT_SENT_BYTES_PER_METHOD, 1028, recordRealTime, true);
tracer.outboundUncompressedSize(1128); tracer.outboundUncompressedSize(1128);
fakeClock.forwardTime(16, MILLISECONDS); fakeClock.forwardTime(16, MILLISECONDS);
tracer.inboundMessage(0); tracer.inboundMessage(0);
assertRealTimeMetric(
RpcMeasureConstants.GRPC_CLIENT_RECEIVED_MESSAGES_PER_METHOD, 1, recordRealTime, true);
tracer.inboundWireSize(33); tracer.inboundWireSize(33);
assertRealTimeMetric(
RpcMeasureConstants.GRPC_CLIENT_RECEIVED_BYTES_PER_METHOD, 33, recordRealTime, true);
tracer.inboundUncompressedSize(67); tracer.inboundUncompressedSize(67);
tracer.outboundMessage(1); tracer.outboundMessage(1);
assertRealTimeMetric(
RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD, 1, recordRealTime, true);
tracer.outboundWireSize(99); tracer.outboundWireSize(99);
assertRealTimeMetric(
RpcMeasureConstants.GRPC_CLIENT_SENT_BYTES_PER_METHOD, 99, recordRealTime, true);
tracer.outboundUncompressedSize(865); tracer.outboundUncompressedSize(865);
fakeClock.forwardTime(24, MILLISECONDS); fakeClock.forwardTime(24, MILLISECONDS);
tracer.inboundMessage(1); tracer.inboundMessage(1);
assertRealTimeMetric(
RpcMeasureConstants.GRPC_CLIENT_RECEIVED_MESSAGES_PER_METHOD, 1, recordRealTime, true);
tracer.inboundWireSize(154); tracer.inboundWireSize(154);
assertRealTimeMetric(
RpcMeasureConstants.GRPC_CLIENT_RECEIVED_BYTES_PER_METHOD, 154, recordRealTime, true);
tracer.inboundUncompressedSize(552); tracer.inboundUncompressedSize(552);
tracer.streamClosed(Status.OK); tracer.streamClosed(Status.OK);
callTracer.callEnded(Status.OK); callTracer.callEnded(Status.OK);
@ -436,6 +471,24 @@ public class CensusModulesTest {
} }
} }
private void assertRealTimeMetric(
Measure measure, long expectedValue, boolean recordRealTimeMetrics, boolean clientSide) {
StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord();
if (!recordRealTimeMetrics) {
assertNull(record);
return;
}
assertNotNull(record);
if (clientSide) {
assertNoServerContent(record);
} else {
assertNoClientContent(record);
}
TagValue methodTagOld = record.tags.get(DeprecatedCensusConstants.RPC_METHOD);
assertEquals(method.getFullMethodName(), methodTagOld.asString());
assertEquals(expectedValue, record.getMetricAsLongOrFail(measure));
}
@Test @Test
public void clientBasicTracingDefaultSpan() { public void clientBasicTracingDefaultSpan() {
CensusTracingModule.ClientCallTracer callTracer = CensusTracingModule.ClientCallTracer callTracer =
@ -597,7 +650,7 @@ public class CensusModulesTest {
tagCtxSerializer, tagCtxSerializer,
statsRecorder, statsRecorder,
fakeClock.getStopwatchSupplier(), fakeClock.getStopwatchSupplier(),
propagate, recordStats, recordStats); propagate, recordStats, recordStats, recordStats);
Metadata headers = new Metadata(); Metadata headers = new Metadata();
CensusStatsModule.ClientCallTracer callTracer = CensusStatsModule.ClientCallTracer callTracer =
census.newClientCallTracer(clientCtx, method.getFullMethodName()); census.newClientCallTracer(clientCtx, method.getFullMethodName());
@ -813,30 +866,36 @@ public class CensusModulesTest {
} }
@Test @Test
public void serverBasicStatsNoHeaders_startsAndFinishes() { public void serverBasicStatsNoHeaders_starts_finishes_noRealTime() {
subtestServerBasicStatsNoHeaders(true, true); subtestServerBasicStatsNoHeaders(true, true, false);
} }
@Test @Test
public void serverBasicStatsNoHeaders_startsOnly() { public void serverBasicStatsNoHeaders_starts_noFinishes_noRealTime() {
subtestServerBasicStatsNoHeaders(true, false); subtestServerBasicStatsNoHeaders(true, false, false);
} }
@Test @Test
public void serverBasicStatsNoHeaders_finishesOnly() { public void serverBasicStatsNoHeaders_noStarts_finishes_noRealTime() {
subtestServerBasicStatsNoHeaders(false, true); subtestServerBasicStatsNoHeaders(false, true, false);
} }
@Test @Test
public void serverBasicStatsNoHeaders_neither() { public void serverBasicStatsNoHeaders_noStarts_noFinishes_noRealTime() {
subtestServerBasicStatsNoHeaders(false, false); subtestServerBasicStatsNoHeaders(false, false, false);
} }
private void subtestServerBasicStatsNoHeaders(boolean recordStarts, boolean recordFinishes) { @Test
public void serverBasicStatsNoHeaders_starts_finishes_realTime() {
subtestServerBasicStatsNoHeaders(true, true, true);
}
private void subtestServerBasicStatsNoHeaders(
boolean recordStarts, boolean recordFinishes, boolean recordRealTime) {
CensusStatsModule localCensusStats = CensusStatsModule localCensusStats =
new CensusStatsModule( new CensusStatsModule(
tagger, tagCtxSerializer, statsRecorder, fakeClock.getStopwatchSupplier(), tagger, tagCtxSerializer, statsRecorder, fakeClock.getStopwatchSupplier(),
true, recordStarts, recordFinishes); true, recordStarts, recordFinishes, recordRealTime);
ServerStreamTracer.Factory tracerFactory = localCensusStats.getServerTracerFactory(); ServerStreamTracer.Factory tracerFactory = localCensusStats.getServerTracerFactory();
ServerStreamTracer tracer = ServerStreamTracer tracer =
tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata()); tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata());
@ -867,20 +926,44 @@ public class CensusModulesTest {
statsCtx); statsCtx);
tracer.inboundMessage(0); tracer.inboundMessage(0);
assertRealTimeMetric(
RpcMeasureConstants.GRPC_SERVER_RECEIVED_MESSAGES_PER_METHOD, 1, recordRealTime, false);
tracer.inboundWireSize(34); tracer.inboundWireSize(34);
assertRealTimeMetric(
RpcMeasureConstants.GRPC_SERVER_RECEIVED_BYTES_PER_METHOD, 34, recordRealTime, false);
tracer.inboundUncompressedSize(67); tracer.inboundUncompressedSize(67);
fakeClock.forwardTime(100, MILLISECONDS); fakeClock.forwardTime(100, MILLISECONDS);
tracer.outboundMessage(0); tracer.outboundMessage(0);
assertRealTimeMetric(
RpcMeasureConstants.GRPC_SERVER_SENT_MESSAGES_PER_METHOD, 1, recordRealTime, false);
tracer.outboundWireSize(1028); tracer.outboundWireSize(1028);
assertRealTimeMetric(
RpcMeasureConstants.GRPC_SERVER_SENT_BYTES_PER_METHOD, 1028, recordRealTime, false);
tracer.outboundUncompressedSize(1128); tracer.outboundUncompressedSize(1128);
fakeClock.forwardTime(16, MILLISECONDS); fakeClock.forwardTime(16, MILLISECONDS);
tracer.inboundMessage(1); tracer.inboundMessage(1);
assertRealTimeMetric(
RpcMeasureConstants.GRPC_SERVER_RECEIVED_MESSAGES_PER_METHOD, 1, recordRealTime, false);
tracer.inboundWireSize(154); tracer.inboundWireSize(154);
assertRealTimeMetric(
RpcMeasureConstants.GRPC_SERVER_RECEIVED_BYTES_PER_METHOD, 154, recordRealTime, false);
tracer.inboundUncompressedSize(552); tracer.inboundUncompressedSize(552);
tracer.outboundMessage(1); tracer.outboundMessage(1);
assertRealTimeMetric(
RpcMeasureConstants.GRPC_SERVER_SENT_MESSAGES_PER_METHOD, 1, recordRealTime, false);
tracer.outboundWireSize(99); tracer.outboundWireSize(99);
assertRealTimeMetric(
RpcMeasureConstants.GRPC_SERVER_SENT_BYTES_PER_METHOD, 99, recordRealTime, false);
tracer.outboundUncompressedSize(865); tracer.outboundUncompressedSize(865);
fakeClock.forwardTime(24, MILLISECONDS); fakeClock.forwardTime(24, MILLISECONDS);

View File

@ -228,7 +228,7 @@ public abstract class AbstractInteropTest {
tagContextBinarySerializer, tagContextBinarySerializer,
serverStatsRecorder, serverStatsRecorder,
GrpcUtil.STOPWATCH_SUPPLIER, GrpcUtil.STOPWATCH_SUPPLIER,
true, true, true)); true, true, true, false /* real-time metrics */));
try { try {
server = builder.build().start(); server = builder.build().start();
} catch (IOException ex) { } catch (IOException ex) {
@ -331,7 +331,7 @@ public abstract class AbstractInteropTest {
protected final CensusStatsModule createClientCensusStatsModule() { protected final CensusStatsModule createClientCensusStatsModule() {
return new CensusStatsModule( return new CensusStatsModule(
tagger, tagContextBinarySerializer, clientStatsRecorder, GrpcUtil.STOPWATCH_SUPPLIER, tagger, tagContextBinarySerializer, clientStatsRecorder, GrpcUtil.STOPWATCH_SUPPLIER,
true, true, true); true, true, true, false /* real-time metrics */);
} }
/** /**

View File

@ -62,6 +62,10 @@ public final class InternalNettyChannelBuilder {
builder.setStatsRecordStartedRpcs(value); builder.setStatsRecordStartedRpcs(value);
} }
public static void setStatsRecordRealTimeMetrics(NettyChannelBuilder builder, boolean value) {
builder.setStatsRecordRealTimeMetrics(value);
}
public static ClientTransportFactory buildTransportFactory(NettyChannelBuilder builder) { public static ClientTransportFactory buildTransportFactory(NettyChannelBuilder builder) {
return builder.buildTransportFactory(); return builder.buildTransportFactory();
} }

View File

@ -33,6 +33,10 @@ public final class InternalNettyServerBuilder {
builder.setStatsRecordStartedRpcs(value); builder.setStatsRecordStartedRpcs(value);
} }
public static void setStatsRecordRealTimeMetrics(NettyServerBuilder builder, boolean value) {
builder.setStatsRecordRealTimeMetrics(value);
}
public static void setTracingEnabled(NettyServerBuilder builder, boolean value) { public static void setTracingEnabled(NettyServerBuilder builder, boolean value) {
builder.setTracingEnabled(value); builder.setTracingEnabled(value);
} }

View File

@ -471,6 +471,11 @@ public final class NettyChannelBuilder
super.setStatsRecordStartedRpcs(value); super.setStatsRecordStartedRpcs(value);
} }
@Override
protected void setStatsRecordRealTimeMetrics(boolean value) {
super.setStatsRecordRealTimeMetrics(value);
}
@VisibleForTesting @VisibleForTesting
NettyChannelBuilder setTransportTracerFactory(TransportTracer.Factory transportTracerFactory) { NettyChannelBuilder setTransportTracerFactory(TransportTracer.Factory transportTracerFactory) {
this.transportTracerFactory = transportTracerFactory; this.transportTracerFactory = transportTracerFactory;

View File

@ -228,6 +228,11 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
super.setStatsRecordStartedRpcs(value); super.setStatsRecordStartedRpcs(value);
} }
@Override
protected void setStatsRecordRealTimeMetrics(boolean value) {
super.setStatsRecordRealTimeMetrics(value);
}
@VisibleForTesting @VisibleForTesting
NettyServerBuilder setTransportTracerFactory(TransportTracer.Factory transportTracerFactory) { NettyServerBuilder setTransportTracerFactory(TransportTracer.Factory transportTracerFactory) {
this.transportTracerFactory = transportTracerFactory; this.transportTracerFactory = transportTracerFactory;

View File

@ -302,15 +302,15 @@ def io_netty_transport():
def io_opencensus_api(): def io_opencensus_api():
native.maven_jar( native.maven_jar(
name = "io_opencensus_opencensus_api", name = "io_opencensus_opencensus_api",
artifact = "io.opencensus:opencensus-api:0.17.0", artifact = "io.opencensus:opencensus-api:0.18.0",
sha1 = "0b9c91321f9c9f20f3a4627bfd9e3097164f85e6", sha1 = "b89a8f8dfd1e1e0d68d83c82a855624814b19a6e",
) )
def io_opencensus_grpc_metrics(): def io_opencensus_grpc_metrics():
native.maven_jar( native.maven_jar(
name = "io_opencensus_opencensus_contrib_grpc_metrics", name = "io_opencensus_opencensus_contrib_grpc_metrics",
artifact = "io.opencensus:opencensus-contrib-grpc-metrics:0.17.0", artifact = "io.opencensus:opencensus-contrib-grpc-metrics:0.18.0",
sha1 = "4b82972073361704f57fa2107910242f1143df25", sha1 = "8e90fab2930b6a0e67dab48911b9c936470d43dd",
) )
def javax_annotation(): def javax_annotation():