From 2961857451abba72312e650d03041c7ab00a588b Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Wed, 28 Nov 2018 14:20:40 -0800 Subject: [PATCH] core: refactor flags in CensusStatsModule. (#5095) There are currently three boolean flags, and there will be one more soon. Put them all in the top-level class instead of passing them as arguments on lower levels. --- .../AbstractManagedChannelImplBuilder.java | 6 +- .../internal/AbstractServerImplBuilder.java | 6 +- .../io/grpc/internal/CensusStatsModule.java | 93 ++++++------------- ...AbstractManagedChannelImplBuilderTest.java | 4 +- .../AbstractServerImplBuilderTest.java | 2 +- .../io/grpc/internal/CensusModulesTest.java | 34 ++++--- .../integration/AbstractInteropTest.java | 5 +- 7 files changed, 60 insertions(+), 90 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java index 160653c032..bca189f52a 100644 --- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java @@ -432,12 +432,12 @@ public abstract class AbstractManagedChannelImplBuilder temporarilyDisableRetry = true; CensusStatsModule censusStats = this.censusStatsOverride; if (censusStats == null) { - censusStats = new CensusStatsModule(GrpcUtil.STOPWATCH_SUPPLIER, true); + censusStats = new CensusStatsModule( + GrpcUtil.STOPWATCH_SUPPLIER, true, recordStartedRpcs, recordFinishedRpcs); } // First interceptor runs last (see ClientInterceptors.intercept()), so that no // other interceptor can override the tracer factory we set in CallOptions. - effectiveInterceptors.add( - 0, censusStats.getClientInterceptor(recordStartedRpcs, recordFinishedRpcs)); + effectiveInterceptors.add(0, censusStats.getClientInterceptor()); } if (tracingEnabled) { temporarilyDisableRetry = true; diff --git a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java index 72744d8009..b9396e4f40 100644 --- a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java @@ -265,10 +265,10 @@ public abstract class AbstractServerImplBuilder statsHeader; private final boolean propagateTags; + private final boolean recordStartedRpcs; + private final boolean recordFinishedRpcs; /** * Creates a {@link CensusStatsModule} with the default OpenCensus implementation. */ - CensusStatsModule(Supplier stopwatchSupplier, boolean propagateTags) { + CensusStatsModule(Supplier stopwatchSupplier, + boolean propagateTags, boolean recordStartedRpcs, boolean recordFinishedRpcs) { this( Tags.getTagger(), Tags.getTagPropagationComponent().getBinarySerializer(), Stats.getStatsRecorder(), stopwatchSupplier, - propagateTags); + propagateTags, recordStartedRpcs, recordFinishedRpcs); } /** @@ -95,12 +98,14 @@ public final class CensusStatsModule { final Tagger tagger, final TagContextBinarySerializer tagCtxSerializer, StatsRecorder statsRecorder, Supplier stopwatchSupplier, - boolean propagateTags) { + boolean propagateTags, boolean recordStartedRpcs, boolean recordFinishedRpcs) { this.tagger = checkNotNull(tagger, "tagger"); this.statsRecorder = checkNotNull(statsRecorder, "statsRecorder"); checkNotNull(tagCtxSerializer, "tagCtxSerializer"); this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); this.propagateTags = propagateTags; + this.recordStartedRpcs = recordStartedRpcs; + this.recordFinishedRpcs = recordFinishedRpcs; this.statsHeader = Metadata.Key.of("grpc-tags-bin", new Metadata.BinaryMarshaller() { @Override @@ -131,25 +136,22 @@ public final class CensusStatsModule { */ @VisibleForTesting ClientCallTracer newClientCallTracer( - TagContext parentCtx, String fullMethodName, - boolean recordStartedRpcs, boolean recordFinishedRpcs) { - return new ClientCallTracer( - this, parentCtx, fullMethodName, recordStartedRpcs, recordFinishedRpcs); + TagContext parentCtx, String fullMethodName) { + return new ClientCallTracer(this, parentCtx, fullMethodName); } /** * Returns the server tracer factory. */ - ServerStreamTracer.Factory getServerTracerFactory( - boolean recordStartedRpcs, boolean recordFinishedRpcs) { - return new ServerTracerFactory(recordStartedRpcs, recordFinishedRpcs); + ServerStreamTracer.Factory getServerTracerFactory() { + return new ServerTracerFactory(); } /** * Returns the client interceptor that facilitates Census-based stats reporting. */ - ClientInterceptor getClientInterceptor(boolean recordStartedRpcs, boolean recordFinishedRpcs) { - return new StatsClientInterceptor(recordStartedRpcs, recordFinishedRpcs); + ClientInterceptor getClientInterceptor() { + return new StatsClientInterceptor(); } private static final class ClientTracer extends ClientStreamTracer { @@ -275,8 +277,6 @@ public final class CensusStatsModule { } } - - @VisibleForTesting static final class ClientCallTracer extends ClientStreamTracer.Factory { @Nullable @@ -314,24 +314,16 @@ public final class CensusStatsModule { private volatile int callEnded; private final TagContext parentCtx; private final TagContext startCtx; - private final boolean recordFinishedRpcs; - ClientCallTracer( - CensusStatsModule module, - TagContext parentCtx, - String fullMethodName, - boolean recordStartedRpcs, - boolean recordFinishedRpcs) { - this.module = module; + ClientCallTracer(CensusStatsModule module, TagContext parentCtx, String fullMethodName) { + this.module = checkNotNull(module); this.parentCtx = checkNotNull(parentCtx); TagValue methodTag = TagValue.create(fullMethodName); - this.startCtx = - module.tagger.toBuilder(parentCtx) + this.startCtx = module.tagger.toBuilder(parentCtx) .put(DeprecatedCensusConstants.RPC_METHOD, methodTag) .build(); this.stopwatch = module.stopwatchSupplier.get().start(); - this.recordFinishedRpcs = recordFinishedRpcs; - if (recordStartedRpcs) { + if (module.recordStartedRpcs) { module.statsRecorder.newMeasureMap() .put(DeprecatedCensusConstants.RPC_CLIENT_STARTED_COUNT, 1) .record(startCtx); @@ -379,7 +371,7 @@ public final class CensusStatsModule { } callEnded = 1; } - if (!recordFinishedRpcs) { + if (!module.recordFinishedRpcs) { return; } stopwatch.stop(); @@ -482,8 +474,6 @@ public final class CensusStatsModule { private final TagContext parentCtx; private volatile int streamClosed; private final Stopwatch stopwatch; - private final Tagger tagger; - private final boolean recordFinishedRpcs; private volatile long outboundMessageCount; private volatile long inboundMessageCount; private volatile long outboundWireSize; @@ -493,17 +483,11 @@ public final class CensusStatsModule { ServerTracer( CensusStatsModule module, - TagContext parentCtx, - Supplier stopwatchSupplier, - Tagger tagger, - boolean recordStartedRpcs, - boolean recordFinishedRpcs) { - this.module = module; + TagContext parentCtx) { + this.module = checkNotNull(module, "module"); this.parentCtx = checkNotNull(parentCtx, "parentCtx"); - this.stopwatch = stopwatchSupplier.get().start(); - this.tagger = tagger; - this.recordFinishedRpcs = recordFinishedRpcs; - if (recordStartedRpcs) { + this.stopwatch = module.stopwatchSupplier.get().start(); + if (module.recordStartedRpcs) { module.statsRecorder.newMeasureMap() .put(DeprecatedCensusConstants.RPC_SERVER_STARTED_COUNT, 1) .record(parentCtx); @@ -588,7 +572,7 @@ public final class CensusStatsModule { } streamClosed = 1; } - if (!recordFinishedRpcs) { + if (!module.recordFinishedRpcs) { return; } stopwatch.stop(); @@ -624,7 +608,7 @@ public final class CensusStatsModule { @Override public Context filterContext(Context context) { - if (!tagger.empty().equals(parentCtx)) { + if (!module.tagger.empty().equals(parentCtx)) { return context.withValue(TAG_CONTEXT_KEY, parentCtx); } return context; @@ -633,14 +617,6 @@ public final class CensusStatsModule { @VisibleForTesting final class ServerTracerFactory extends ServerStreamTracer.Factory { - private final boolean recordStartedRpcs; - private final boolean recordFinishedRpcs; - - ServerTracerFactory(boolean recordStartedRpcs, boolean recordFinishedRpcs) { - this.recordStartedRpcs = recordStartedRpcs; - this.recordFinishedRpcs = recordFinishedRpcs; - } - @Override public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) { TagContext parentCtx = headers.get(statsHeader); @@ -653,34 +629,19 @@ public final class CensusStatsModule { .toBuilder(parentCtx) .put(DeprecatedCensusConstants.RPC_METHOD, methodTag) .build(); - return new ServerTracer( - CensusStatsModule.this, - parentCtx, - stopwatchSupplier, - tagger, - recordStartedRpcs, - recordFinishedRpcs); + return new ServerTracer(CensusStatsModule.this, parentCtx); } } @VisibleForTesting final class StatsClientInterceptor implements ClientInterceptor { - private final boolean recordStartedRpcs; - private final boolean recordFinishedRpcs; - - StatsClientInterceptor(boolean recordStartedRpcs, boolean recordFinishedRpcs) { - this.recordStartedRpcs = recordStartedRpcs; - this.recordFinishedRpcs = recordFinishedRpcs; - } - @Override public ClientCall interceptCall( MethodDescriptor method, CallOptions callOptions, Channel next) { // New RPCs on client-side inherit the tag context from the current Context. TagContext parentCtx = tagger.getCurrentTagContext(); final ClientCallTracer tracerFactory = - newClientCallTracer(parentCtx, method.getFullMethodName(), - recordStartedRpcs, recordFinishedRpcs); + newClientCallTracer(parentCtx, method.getFullMethodName()); ClientCall call = next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory)); return new SimpleForwardingClientCall(call) { diff --git a/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java b/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java index b6ab80eac3..f88797cfa8 100644 --- a/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java @@ -414,7 +414,7 @@ public class AbstractManagedChannelImplBuilderTest { new FakeTagContextBinarySerializer(), new FakeStatsRecorder(), GrpcUtil.STOPWATCH_SUPPLIER, - true)); + true, true, true)); } Builder(SocketAddress directServerAddress, String authority) { @@ -425,7 +425,7 @@ public class AbstractManagedChannelImplBuilderTest { new FakeTagContextBinarySerializer(), new FakeStatsRecorder(), GrpcUtil.STOPWATCH_SUPPLIER, - true)); + true, true, true)); } @Override diff --git a/core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java b/core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java index db4def2d07..e0fd106d8d 100644 --- a/core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java @@ -91,7 +91,7 @@ public class AbstractServerImplBuilderTest { new FakeTagContextBinarySerializer(), new FakeStatsRecorder(), GrpcUtil.STOPWATCH_SUPPLIER, - true)); + true, true, true)); } @Override diff --git a/core/src/test/java/io/grpc/internal/CensusModulesTest.java b/core/src/test/java/io/grpc/internal/CensusModulesTest.java index b4fb69aa9f..1b14363f6e 100644 --- a/core/src/test/java/io/grpc/internal/CensusModulesTest.java +++ b/core/src/test/java/io/grpc/internal/CensusModulesTest.java @@ -191,7 +191,8 @@ public class CensusModulesTest { .thenReturn(fakeClientSpanContext); censusStats = new CensusStatsModule( - tagger, tagCtxSerializer, statsRecorder, fakeClock.getStopwatchSupplier(), true); + tagger, tagCtxSerializer, statsRecorder, fakeClock.getStopwatchSupplier(), + true, true, true); censusTracing = new CensusTracingModule(tracer, mockTracingPropagationHandler); } @@ -240,7 +241,7 @@ public class CensusModulesTest { Channel interceptedChannel = ClientInterceptors.intercept( grpcServerRule.getChannel(), callOptionsCaptureInterceptor, - censusStats.getClientInterceptor(true, true), censusTracing.getClientInterceptor()); + censusStats.getClientInterceptor(), censusTracing.getClientInterceptor()); ClientCall call; if (nonDefaultContext) { Context ctx = @@ -353,9 +354,13 @@ public class CensusModulesTest { } private void subtestClientBasicStatsDefaultContext(boolean recordStarts, boolean recordFinishes) { + CensusStatsModule localCensusStats = + new CensusStatsModule( + tagger, tagCtxSerializer, statsRecorder, fakeClock.getStopwatchSupplier(), + true, recordStarts, recordFinishes); CensusStatsModule.ClientCallTracer callTracer = - censusStats.newClientCallTracer( - tagger.empty(), method.getFullMethodName(), recordStarts, recordFinishes); + localCensusStats.newClientCallTracer( + tagger.empty(), method.getFullMethodName()); Metadata headers = new Metadata(); ClientStreamTracer tracer = callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers); @@ -490,8 +495,7 @@ public class CensusModulesTest { @Test public void clientStreamNeverCreatedStillRecordStats() { CensusStatsModule.ClientCallTracer callTracer = - censusStats.newClientCallTracer( - tagger.empty(), method.getFullMethodName(), true, true); + censusStats.newClientCallTracer(tagger.empty(), method.getFullMethodName()); fakeClock.forwardTime(3000, MILLISECONDS); callTracer.callEnded(Status.DEADLINE_EXCEEDED.withDescription("3 seconds")); @@ -593,10 +597,10 @@ public class CensusModulesTest { tagCtxSerializer, statsRecorder, fakeClock.getStopwatchSupplier(), - propagate); + propagate, recordStats, recordStats); Metadata headers = new Metadata(); CensusStatsModule.ClientCallTracer callTracer = - census.newClientCallTracer(clientCtx, method.getFullMethodName(), recordStats, recordStats); + census.newClientCallTracer(clientCtx, method.getFullMethodName()); // This propagates clientCtx to headers if propagates==true callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers); if (recordStats) { @@ -619,8 +623,7 @@ public class CensusModulesTest { } ServerStreamTracer serverTracer = - census.getServerTracerFactory(recordStats, recordStats).newServerStreamTracer( - method.getFullMethodName(), headers); + census.getServerTracerFactory().newServerStreamTracer(method.getFullMethodName(), headers); // Server tracer deserializes clientCtx from the headers, so that it records stats with the // propagated tags. Context serverContext = serverTracer.filterContext(Context.ROOT); @@ -686,10 +689,12 @@ public class CensusModulesTest { @Test public void statsHeadersNotPropagateDefaultContext() { CensusStatsModule.ClientCallTracer callTracer = - censusStats.newClientCallTracer(tagger.empty(), method.getFullMethodName(), false, false); + censusStats.newClientCallTracer(tagger.empty(), method.getFullMethodName()); Metadata headers = new Metadata(); callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers); assertFalse(headers.containsKey(censusStats.statsHeader)); + // Clear recorded stats to satisfy the assertions in wrapUp() + statsRecorder.rolloverRecords(); } @Test @@ -828,8 +833,11 @@ public class CensusModulesTest { } private void subtestServerBasicStatsNoHeaders(boolean recordStarts, boolean recordFinishes) { - ServerStreamTracer.Factory tracerFactory = - censusStats.getServerTracerFactory(recordStarts, recordFinishes); + CensusStatsModule localCensusStats = + new CensusStatsModule( + tagger, tagCtxSerializer, statsRecorder, fakeClock.getStopwatchSupplier(), + true, recordStarts, recordFinishes); + ServerStreamTracer.Factory tracerFactory = localCensusStats.getServerTracerFactory(); ServerStreamTracer tracer = tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata()); diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index 5a92a1e705..17cb821c0f 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -228,7 +228,7 @@ public abstract class AbstractInteropTest { tagContextBinarySerializer, serverStatsRecorder, GrpcUtil.STOPWATCH_SUPPLIER, - true)); + true, true, true)); try { server = builder.build().start(); } catch (IOException ex) { @@ -330,7 +330,8 @@ public abstract class AbstractInteropTest { protected final CensusStatsModule createClientCensusStatsModule() { return new CensusStatsModule( - tagger, tagContextBinarySerializer, clientStatsRecorder, GrpcUtil.STOPWATCH_SUPPLIER, true); + tagger, tagContextBinarySerializer, clientStatsRecorder, GrpcUtil.STOPWATCH_SUPPLIER, + true, true, true); } /**