core: move census registration into internal

also preserve tracing bit when rebuilding a MethodDescriptor
This commit is contained in:
Carl Mastrangelo 2017-09-29 18:20:37 -07:00 committed by GitHub
parent cbcab9b498
commit 02466744bb
6 changed files with 153 additions and 44 deletions

View File

@ -38,7 +38,10 @@ public final class InternalMethodDescriptor {
md.setRawMethodName(transport.ordinal(), o); md.setRawMethodName(transport.ordinal(), o);
} }
public static String generateTraceSpanName(boolean isServer, String fullMethodName) { public interface RegisterForTracingCallback extends
return MethodDescriptor.generateTraceSpanName(isServer, fullMethodName); MethodDescriptor.Registrations.RegisterForTracingCallback {}
public static void setRegisterCallback(RegisterForTracingCallback registerCallback) {
MethodDescriptor.Registrations.setRegisterForTracingCallback(registerCallback);
} }
} }

View File

@ -17,12 +17,15 @@
package io.grpc; package io.grpc;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import io.opencensus.trace.Tracing;
import io.opencensus.trace.export.SampledSpanStore;
import java.io.InputStream; import java.io.InputStream;
import java.util.ArrayList; import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.concurrent.atomic.AtomicReferenceArray;
import javax.annotation.CheckReturnValue; import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -48,11 +51,16 @@ public final class MethodDescriptor<ReqT, RespT> {
private final @Nullable Object schemaDescriptor; private final @Nullable Object schemaDescriptor;
private final boolean idempotent; private final boolean idempotent;
private final boolean safe; private final boolean safe;
// This field is not exposed, since the result would be misleading. Setting this value to true,
// ensures that the method is traced, but false means that it might still be traced, due to
// another descriptor tracing the same name.
private final boolean registerForTracing;
// Must be set to InternalKnownTransport.values().length // Must be set to InternalKnownTransport.values().length
// Not referenced to break the dependency. // Not referenced to break the dependency.
private final AtomicReferenceArray<Object> rawMethodNames = new AtomicReferenceArray<Object>(1); private final AtomicReferenceArray<Object> rawMethodNames = new AtomicReferenceArray<Object>(1);
/** /**
* Gets the cached "raw" method name for this Method Descriptor. The raw name is transport * Gets the cached "raw" method name for this Method Descriptor. The raw name is transport
* specific, and should be set using {@link #setRawMethodName} by the transport. * specific, and should be set using {@link #setRawMethodName} by the transport.
@ -73,18 +81,6 @@ public final class MethodDescriptor<ReqT, RespT> {
rawMethodNames.lazySet(transportOrdinal, o); rawMethodNames.lazySet(transportOrdinal, o);
} }
/**
* Convert a full method name to a tracing span name.
*
* @param isServer {@code false} if the span is on the client-side, {@code true} if on the
* server-side
* @param fullMethodName the method name as returned by {@link #getFullMethodName}.
*/
static String generateTraceSpanName(boolean isServer, String fullMethodName) {
String prefix = isServer ? "Recv" : "Sent";
return prefix + "." + fullMethodName.replace('/', '.');
}
/** /**
* The call type of a method. * The call type of a method.
* *
@ -224,7 +220,7 @@ public final class MethodDescriptor<ReqT, RespT> {
Marshaller<RequestT> requestMarshaller, Marshaller<RequestT> requestMarshaller,
Marshaller<ResponseT> responseMarshaller) { Marshaller<ResponseT> responseMarshaller) {
return new MethodDescriptor<RequestT, ResponseT>( return new MethodDescriptor<RequestT, ResponseT>(
type, fullMethodName, requestMarshaller, responseMarshaller, null, false, false); type, fullMethodName, requestMarshaller, responseMarshaller, null, false, false, false);
} }
private MethodDescriptor( private MethodDescriptor(
@ -234,7 +230,8 @@ public final class MethodDescriptor<ReqT, RespT> {
Marshaller<RespT> responseMarshaller, Marshaller<RespT> responseMarshaller,
Object schemaDescriptor, Object schemaDescriptor,
boolean idempotent, boolean idempotent,
boolean safe) { boolean safe,
boolean registerForTracing) {
this.type = Preconditions.checkNotNull(type, "type"); this.type = Preconditions.checkNotNull(type, "type");
this.fullMethodName = Preconditions.checkNotNull(fullMethodName, "fullMethodName"); this.fullMethodName = Preconditions.checkNotNull(fullMethodName, "fullMethodName");
@ -243,8 +240,12 @@ public final class MethodDescriptor<ReqT, RespT> {
this.schemaDescriptor = schemaDescriptor; this.schemaDescriptor = schemaDescriptor;
this.idempotent = idempotent; this.idempotent = idempotent;
this.safe = safe; this.safe = safe;
this.registerForTracing = registerForTracing;
Preconditions.checkArgument(!safe || type == MethodType.UNARY, Preconditions.checkArgument(!safe || type == MethodType.UNARY,
"Only unary methods can be specified safe"); "Only unary methods can be specified safe");
if (registerForTracing) {
Registrations.registerForTracing(this);
}
} }
/** /**
@ -441,7 +442,8 @@ public final class MethodDescriptor<ReqT, RespT> {
.setType(type) .setType(type)
.setFullMethodName(fullMethodName) .setFullMethodName(fullMethodName)
.setIdempotent(idempotent) .setIdempotent(idempotent)
.setSafe(safe); .setSafe(safe)
.setRegisterForTracing(registerForTracing);
} }
/** /**
@ -563,15 +565,6 @@ public final class MethodDescriptor<ReqT, RespT> {
*/ */
@CheckReturnValue @CheckReturnValue
public MethodDescriptor<ReqT, RespT> build() { public MethodDescriptor<ReqT, RespT> build() {
if (registerForTracing) {
SampledSpanStore sampledStore = Tracing.getExportComponent().getSampledSpanStore();
if (sampledStore != null) {
ArrayList<String> spanNames = new ArrayList<String>(2);
spanNames.add(generateTraceSpanName(false, fullMethodName));
spanNames.add(generateTraceSpanName(true, fullMethodName));
sampledStore.registerSpanNamesForCollection(spanNames);
}
}
return new MethodDescriptor<ReqT, RespT>( return new MethodDescriptor<ReqT, RespT>(
type, type,
fullMethodName, fullMethodName,
@ -579,7 +572,73 @@ public final class MethodDescriptor<ReqT, RespT> {
responseMarshaller, responseMarshaller,
schemaDescriptor, schemaDescriptor,
idempotent, idempotent,
safe); safe,
registerForTracing);
}
}
static final class Registrations {
private static final ReferenceQueue<MethodDescriptor<?, ?>> droppedMethodDescriptors =
new ReferenceQueue<MethodDescriptor<?, ?>>();
private static final Collection<WeakReference<MethodDescriptor<?, ?>>> pendingRegistrations =
new LinkedList<WeakReference<MethodDescriptor<?, ?>>>();
private static volatile RegisterForTracingCallback registerCallback;
interface RegisterForTracingCallback {
void onRegister(MethodDescriptor<?, ?> md);
}
/**
* Sets a callback for method descriptor builds. Called for descriptors with
* {@link MethodDescriptor#registerForTracing} set. This should only be called by
* {@link io.grpc.internal.CensusTracingModule} since it will only work for one invocation.
*
* @param registerCallback the callback to handle descriptor registration.
*/
static synchronized void setRegisterForTracingCallback(
RegisterForTracingCallback registerCallback) {
checkState(Registrations.registerCallback == null, "callback already present");
Registrations.registerCallback = checkNotNull(registerCallback, "registerCallback");
for (WeakReference<MethodDescriptor<?, ?>> mdRef : pendingRegistrations) {
MethodDescriptor<?, ?> md = mdRef.get();
if (md != null) {
mdRef.clear();
registerCallback.onRegister(md);
}
}
drainDroppedMethodDescriptors();
}
private static synchronized void drainDroppedMethodDescriptors() {
boolean found = false;
while (droppedMethodDescriptors.poll() != null) {
found = true;
}
if (found) {
Iterator<WeakReference<MethodDescriptor<?, ?>>> it = pendingRegistrations.iterator();
while (it.hasNext()) {
if (it.next().get() == null) {
it.remove();
}
}
}
}
private static void registerForTracing(MethodDescriptor<?, ?> md) {
RegisterForTracingCallback reg;
if ((reg = registerCallback) == null) {
synchronized (MethodDescriptor.class) {
if ((reg = registerCallback) == null) {
pendingRegistrations.add(
new WeakReference<MethodDescriptor<?, ?>>(md, droppedMethodDescriptors));
drainDroppedMethodDescriptors();
return;
}
}
}
reg.onRegister(md);
} }
} }
} }

View File

@ -29,6 +29,7 @@ import io.grpc.Context;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.InternalMethodDescriptor; import io.grpc.InternalMethodDescriptor;
import io.grpc.InternalMethodDescriptor.RegisterForTracingCallback;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
import io.grpc.ServerStreamTracer; import io.grpc.ServerStreamTracer;
@ -39,7 +40,11 @@ import io.opencensus.trace.Span;
import io.opencensus.trace.SpanContext; import io.opencensus.trace.SpanContext;
import io.opencensus.trace.Status; import io.opencensus.trace.Status;
import io.opencensus.trace.Tracer; import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import io.opencensus.trace.export.SampledSpanStore;
import io.opencensus.trace.propagation.BinaryFormat; import io.opencensus.trace.propagation.BinaryFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -65,6 +70,24 @@ final class CensusTracingModule {
private final TracingClientInterceptor clientInterceptor = new TracingClientInterceptor(); private final TracingClientInterceptor clientInterceptor = new TracingClientInterceptor();
private final ServerTracerFactory serverTracerFactory = new ServerTracerFactory(); private final ServerTracerFactory serverTracerFactory = new ServerTracerFactory();
private static final boolean isRegistered = registerCensusTracer();
private static boolean registerCensusTracer() {
InternalMethodDescriptor.setRegisterCallback(new RegisterForTracingCallback() {
@Override
public void onRegister(MethodDescriptor<?, ?> md) {
SampledSpanStore sampledStore = Tracing.getExportComponent().getSampledSpanStore();
if (sampledStore != null) {
List<String> spanNames = new ArrayList<String>(2);
spanNames.add(generateTraceSpanName(false, md.getFullMethodName()));
spanNames.add(generateTraceSpanName(true, md.getFullMethodName()));
sampledStore.registerSpanNamesForCollection(spanNames);
}
}
});
return true;
}
CensusTracingModule( CensusTracingModule(
Tracer censusTracer, final BinaryFormat censusPropagationBinaryFormat) { Tracer censusTracer, final BinaryFormat censusPropagationBinaryFormat) {
this.censusTracer = checkNotNull(censusTracer, "censusTracer"); this.censusTracer = checkNotNull(censusTracer, "censusTracer");
@ -202,7 +225,7 @@ final class CensusTracingModule {
this.span = this.span =
censusTracer censusTracer
.spanBuilderWithExplicitParent( .spanBuilderWithExplicitParent(
InternalMethodDescriptor.generateTraceSpanName(false, fullMethodName), generateTraceSpanName(false, fullMethodName),
parentSpan) parentSpan)
.setRecordEvents(true) .setRecordEvents(true)
.startSpan(); .startSpan();
@ -260,7 +283,7 @@ final class CensusTracingModule {
this.span = this.span =
censusTracer censusTracer
.spanBuilderWithRemoteParent( .spanBuilderWithRemoteParent(
InternalMethodDescriptor.generateTraceSpanName(true, fullMethodName), generateTraceSpanName(true, fullMethodName),
remoteSpan) remoteSpan)
.setRecordEvents(true) .setRecordEvents(true)
.startSpan(); .startSpan();
@ -345,4 +368,19 @@ final class CensusTracingModule {
}; };
} }
} }
/**
* Convert a full method name to a tracing span name.
*
* @param isServer {@code false} if the span is on the client-side, {@code true} if on the
* server-side
* @param fullMethodName the method name as returned by
* {@link MethodDescriptor#getFullMethodName}.
*/
@VisibleForTesting
static String generateTraceSpanName(boolean isServer, String fullMethodName) {
String prefix = isServer ? "Recv" : "Sent";
return prefix + "." + fullMethodName.replace('/', '.');
}
} }

View File

@ -94,12 +94,4 @@ public class MethodDescriptorTest {
// Never reached // Never reached
assert discard == null; assert discard == null;
} }
@Test
public void generateTraceSpanName() {
assertEquals(
"Sent.io.grpc.Foo", MethodDescriptor.generateTraceSpanName(false, "io.grpc/Foo"));
assertEquals(
"Recv.io.grpc.Bar", MethodDescriptor.generateTraceSpanName(true, "io.grpc/Bar"));
}
} }

View File

@ -725,6 +725,15 @@ public class CensusModulesTest {
} }
} }
@Test
public void generateTraceSpanName() {
assertEquals(
"Sent.io.grpc.Foo", CensusTracingModule.generateTraceSpanName(false, "io.grpc/Foo"));
assertEquals(
"Recv.io.grpc.Bar", CensusTracingModule.generateTraceSpanName(true, "io.grpc/Bar"));
}
private static void assertNoServerContent(StatsTestUtils.MetricsRecord record) { private static void assertNoServerContent(StatsTestUtils.MetricsRecord record) {
assertNull(record.getMetric(RpcConstants.RPC_SERVER_ERROR_COUNT)); assertNull(record.getMetric(RpcConstants.RPC_SERVER_ERROR_COUNT));
assertNull(record.getMetric(RpcConstants.RPC_SERVER_REQUEST_COUNT)); assertNull(record.getMetric(RpcConstants.RPC_SERVER_REQUEST_COUNT));

View File

@ -24,7 +24,6 @@ import static io.grpc.MethodDescriptor.MethodType.UNARY;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import io.grpc.InternalMethodDescriptor;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
import io.opencensus.trace.Tracing; import io.opencensus.trace.Tracing;
import io.opencensus.trace.export.SampledSpanStore; import io.opencensus.trace.export.SampledSpanStore;
@ -61,8 +60,9 @@ public class SimpleServiceTest {
@Test @Test
public void registerSampledMethodsForTracing() throws Exception { public void registerSampledMethodsForTracing() throws Exception {
// Make sure SimpleServiceGrpc class is loaded // Make sure SimpleServiceGrpc and CensusTracingModule classes are loaded.
assertNotNull(Class.forName(SimpleServiceGrpc.class.getName())); assertNotNull(Class.forName(SimpleServiceGrpc.class.getName()));
assertNotNull(Class.forName("io.grpc.internal.CensusTracingModule"));
String[] methodNames = new String[] { String[] methodNames = new String[] {
"grpc.testing.SimpleService/UnaryRpc", "grpc.testing.SimpleService/UnaryRpc",
@ -72,12 +72,20 @@ public class SimpleServiceTest {
ArrayList<String> expectedSpans = new ArrayList<String>(); ArrayList<String> expectedSpans = new ArrayList<String>();
for (String methodName : methodNames) { for (String methodName : methodNames) {
expectedSpans.add(InternalMethodDescriptor.generateTraceSpanName(false, methodName)); expectedSpans.add(generateTraceSpanName(false, methodName));
expectedSpans.add(InternalMethodDescriptor.generateTraceSpanName(true, methodName)); expectedSpans.add(generateTraceSpanName(true, methodName));
} }
SampledSpanStore sampledStore = Tracing.getExportComponent().getSampledSpanStore(); SampledSpanStore sampledStore = Tracing.getExportComponent().getSampledSpanStore();
Set<String> registeredSpans = sampledStore.getRegisteredSpanNamesForCollection(); Set<String> registeredSpans = sampledStore.getRegisteredSpanNamesForCollection();
assertThat(registeredSpans).containsAllIn(expectedSpans); assertThat(registeredSpans).containsAllIn(expectedSpans);
} }
/**
* Copy of {@link io.grpc.internal.CensusTracingModule#generateTraceSpanName} to break dependency.
*/
private static String generateTraceSpanName(boolean isServer, String fullMethodName) {
String prefix = isServer ? "Recv" : "Sent";
return prefix + "." + fullMethodName.replace('/', '.');
}
} }