Instrument gRPC

Adds spans for the duration of the connection and child spans for each message.
Also propagates the trace using the metadata object.
This commit is contained in:
Tyler Benson 2018-07-19 12:57:13 +10:00
parent aa3fc0717e
commit a34f7b849b
17 changed files with 1125 additions and 6 deletions

View File

@ -0,0 +1,74 @@
apply plugin: 'version-scan'
versionScan {
group = "io.grpc"
module = "grpc-core"
versions = "[1.5.0,)"
verifyPresent = [
"io.grpc.InternalServerInterceptors": null,
]
}
buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.6'
}
}
apply from: "${rootDir}/gradle/java.gradle"
apply plugin: 'idea'
apply plugin: 'com.google.protobuf'
def grpcVersion = '1.5.0'
protobuf {
protoc {
// Download compiler rather than using locally installed version:
artifact = 'com.google.protobuf:protoc:3.3.0'
}
plugins {
grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" }
}
generateProtoTasks {
all()*.plugins { grpc {} }
}
}
apply plugin: 'org.unbroken-dome.test-sets'
testSets {
latestDepTest {
dirName = 'test'
}
}
dependencies {
compileOnly group: 'io.grpc', name: 'grpc-core', version: grpcVersion
compile project(':dd-trace-ot')
compile project(':dd-java-agent:agent-tooling')
compile deps.bytebuddy
compile deps.opentracing
compile deps.autoservice
annotationProcessor deps.autoservice
implementation deps.autoservice
testCompile project(':dd-java-agent:testing')
testCompile group: 'io.grpc', name: 'grpc-netty', version: grpcVersion
testCompile group: 'io.grpc', name: 'grpc-protobuf', version: grpcVersion
testCompile group: 'io.grpc', name: 'grpc-stub', version: grpcVersion
latestDepTestCompile sourceSets.test.output // include the protobuf generated classes
}
configurations.latestDepTestCompile {
resolutionStrategy {
force group: 'io.grpc', name: 'grpc-netty', version: '+'
force group: 'io.grpc', name: 'grpc-protobuf', version: '+'
force group: 'io.grpc', name: 'grpc-stub', version: '+'
}
}

View File

@ -0,0 +1,72 @@
package datadog.trace.instrumentation.grpc.client;
import static datadog.trace.agent.tooling.ClassLoaderMatcher.classLoaderHasClasses;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import io.grpc.ClientInterceptor;
import io.opentracing.util.GlobalTracer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.matcher.ElementMatcher;
@AutoService(Instrumenter.class)
public class GrpcClientBuilderInstrumentation extends Instrumenter.Default {
public GrpcClientBuilderInstrumentation() {
super("grpc", "grpc-client");
}
@Override
public ElementMatcher typeMatcher() {
return named("io.grpc.internal.AbstractManagedChannelImplBuilder");
}
@Override
public ElementMatcher<? super ClassLoader> classLoaderMatcher() {
return classLoaderHasClasses("io.grpc.InternalServerInterceptors");
}
@Override
public String[] helperClassNames() {
return new String[] {
"datadog.trace.instrumentation.grpc.client.GrpcInjectAdapter",
"datadog.trace.instrumentation.grpc.client.TracingClientInterceptor",
"datadog.trace.instrumentation.grpc.client.TracingClientInterceptor$TracingClientCall",
"datadog.trace.instrumentation.grpc.client.TracingClientInterceptor$TracingClientCallListener",
};
}
@Override
public Map<ElementMatcher, String> transformers() {
return Collections.<ElementMatcher, String>singletonMap(
isMethod().and(named("build")), AddInterceptorAdvice.class.getName());
}
@Override
protected boolean defaultEnabled() {
return false;
}
public static class AddInterceptorAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void addInterceptor(
@Advice.FieldValue("interceptors") final List<ClientInterceptor> interceptors) {
boolean shouldRegister = true;
for (final ClientInterceptor interceptor : interceptors) {
if (interceptor instanceof TracingClientInterceptor) {
shouldRegister = false;
break;
}
}
if (shouldRegister) {
interceptors.add(0, new TracingClientInterceptor(GlobalTracer.get()));
}
}
}
}

View File

@ -0,0 +1,25 @@
package datadog.trace.instrumentation.grpc.client;
import io.grpc.Metadata;
import io.opentracing.propagation.TextMap;
import java.util.Iterator;
import java.util.Map;
public final class GrpcInjectAdapter implements TextMap {
private final Metadata metadata;
public GrpcInjectAdapter(final Metadata metadata) {
this.metadata = metadata;
}
@Override
public Iterator<Map.Entry<String, String>> iterator() {
throw new UnsupportedOperationException(
"GrpcInjectAdapter should only be used with Tracer.inject()");
}
@Override
public void put(final String key, final String value) {
this.metadata.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value);
}
}

View File

@ -0,0 +1,171 @@
package datadog.trace.instrumentation.grpc.client;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.propagation.Format;
import io.opentracing.tag.Tags;
import java.util.Collections;
public class TracingClientInterceptor implements ClientInterceptor {
private final Tracer tracer;
public TracingClientInterceptor(final Tracer tracer) {
this.tracer = tracer;
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
final MethodDescriptor<ReqT, RespT> method,
final CallOptions callOptions,
final Channel next) {
final Scope scope =
tracer
.buildSpan("grpc.client")
.withTag(DDTags.RESOURCE_NAME, method.getFullMethodName())
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.RPC)
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
.startActive(false);
final Span span = scope.span();
final ClientCall<ReqT, RespT> result;
try {
// call other interceptors
result = next.newCall(method, callOptions);
} catch (final RuntimeException | Error e) {
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, e));
span.finish();
throw e;
} finally {
scope.close();
}
return new TracingClientCall<>(tracer, span, result);
}
static final class TracingClientCall<ReqT, RespT>
extends ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT> {
final Tracer tracer;
final Span span;
TracingClientCall(
final Tracer tracer, final Span span, final ClientCall<ReqT, RespT> delegate) {
super(delegate);
this.tracer = tracer;
this.span = span;
}
@Override
public void start(final Listener<RespT> responseListener, final Metadata headers) {
tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new GrpcInjectAdapter(headers));
try (final Scope ignored = tracer.scopeManager().activate(span, false)) {
super.start(new TracingClientCallListener<>(tracer, span, responseListener), headers);
} catch (final RuntimeException | Error e) {
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, e));
span.finish();
throw e;
}
}
@Override
public void sendMessage(final ReqT message) {
try (final Scope ignored = tracer.scopeManager().activate(span, false)) {
super.sendMessage(message);
} catch (final RuntimeException | Error e) {
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, e));
span.finish();
throw e;
}
}
}
static final class TracingClientCallListener<RespT>
extends ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT> {
final Tracer tracer;
final Span span;
TracingClientCallListener(
final Tracer tracer, final Span span, final ClientCall.Listener<RespT> delegate) {
super(delegate);
this.tracer = tracer;
this.span = span;
}
@Override
public void onMessage(final RespT message) {
final Scope scope =
tracer
.buildSpan("grpc.message")
.asChildOf(span)
.withTag("message.type", message.getClass().getName())
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.RPC)
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
.startActive(true);
try {
delegate().onMessage(message);
} catch (final RuntimeException | Error e) {
final Span span = scope.span();
Tags.ERROR.set(span, true);
this.span.log(Collections.singletonMap(ERROR_OBJECT, e));
this.span.finish();
throw e;
} finally {
scope.close();
}
}
@Override
public void onClose(final Status status, final Metadata trailers) {
span.setTag("status.code", status.getCode().name());
if (status.getDescription() != null) {
span.setTag("status.description", status.getDescription());
}
if (!status.isOk()) {
Tags.ERROR.set(span, true);
}
if (status.getCause() != null) {
span.log(Collections.singletonMap(ERROR_OBJECT, status.getCause()));
}
// Finishes span.
try (final Scope ignored = tracer.scopeManager().activate(span, true)) {
delegate().onClose(status, trailers);
} catch (final RuntimeException | Error e) {
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, e));
span.finish();
throw e;
}
}
@Override
public void onReady() {
try (final Scope ignored = tracer.scopeManager().activate(span, false)) {
delegate().onReady();
} catch (final RuntimeException | Error e) {
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, e));
span.finish();
throw e;
}
}
}
}

View File

@ -0,0 +1,70 @@
package datadog.trace.instrumentation.grpc.server;
import static datadog.trace.agent.tooling.ClassLoaderMatcher.classLoaderHasClasses;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import io.grpc.ServerInterceptor;
import io.opentracing.util.GlobalTracer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.matcher.ElementMatcher;
@AutoService(Instrumenter.class)
public class GrpcServerBuilderInstrumentation extends Instrumenter.Default {
public GrpcServerBuilderInstrumentation() {
super("grpc", "grpc-server");
}
@Override
public ElementMatcher typeMatcher() {
return named("io.grpc.internal.AbstractServerImplBuilder");
}
@Override
public ElementMatcher<? super ClassLoader> classLoaderMatcher() {
return classLoaderHasClasses("io.grpc.InternalServerInterceptors");
}
@Override
public String[] helperClassNames() {
return new String[] {
"datadog.trace.instrumentation.grpc.server.TracingServerInterceptor",
"datadog.trace.instrumentation.grpc.server.TracingServerInterceptor$TracingServerCallListener",
};
}
@Override
public Map<ElementMatcher, String> transformers() {
return Collections.<ElementMatcher, String>singletonMap(
isMethod().and(named("build")), AddInterceptorAdvice.class.getName());
}
@Override
protected boolean defaultEnabled() {
return false;
}
public static class AddInterceptorAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void addInterceptor(
@Advice.FieldValue("interceptors") final List<ServerInterceptor> interceptors) {
boolean shouldRegister = true;
for (final ServerInterceptor interceptor : interceptors) {
if (interceptor instanceof TracingServerInterceptor) {
shouldRegister = false;
break;
}
}
if (shouldRegister) {
interceptors.add(0, new TracingServerInterceptor(GlobalTracer.get()));
}
}
}
}

View File

@ -0,0 +1,162 @@
package datadog.trace.instrumentation.grpc.server;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import io.grpc.ForwardingServerCallListener;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.propagation.Format;
import io.opentracing.propagation.TextMapExtractAdapter;
import io.opentracing.tag.Tags;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class TracingServerInterceptor implements ServerInterceptor {
private final Tracer tracer;
public TracingServerInterceptor(final Tracer tracer) {
this.tracer = tracer;
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
final ServerCall<ReqT, RespT> call,
final Metadata headers,
final ServerCallHandler<ReqT, RespT> next) {
final Map<String, String> headerMap = new HashMap<>();
for (final String key : headers.keys()) {
if (!key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
final String value = headers.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER));
headerMap.put(key, value);
}
}
final SpanContext spanContext =
tracer.extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(headerMap));
final Tracer.SpanBuilder spanBuilder =
tracer
.buildSpan("grpc.server")
.withTag(DDTags.RESOURCE_NAME, call.getMethodDescriptor().getFullMethodName())
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.RPC)
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER);
if (spanContext != null) {
spanBuilder.asChildOf(spanContext);
}
final Scope scope = spanBuilder.startActive(false);
final Span span = scope.span();
final ServerCall.Listener<ReqT> result;
try {
// call other interceptors
result = next.startCall(call, headers);
} catch (final RuntimeException | Error e) {
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, e));
span.finish();
throw e;
} finally {
scope.close();
}
// This ensures the server implementation can see the span in scope
return new TracingServerCallListener<>(tracer, span, result);
}
static final class TracingServerCallListener<ReqT>
extends ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT> {
final Tracer tracer;
final Span span;
TracingServerCallListener(
final Tracer tracer, final Span span, final ServerCall.Listener<ReqT> delegate) {
super(delegate);
this.tracer = tracer;
this.span = span;
}
@Override
public void onMessage(final ReqT message) {
final Scope scope =
tracer
.buildSpan("grpc.message")
.asChildOf(span)
.withTag("message.type", message.getClass().getName())
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.RPC)
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER)
.startActive(true);
try {
delegate().onMessage(message);
} catch (final RuntimeException | Error e) {
final Span span = scope.span();
Tags.ERROR.set(span, true);
this.span.log(Collections.singletonMap(ERROR_OBJECT, e));
this.span.finish();
throw e;
} finally {
scope.close();
}
}
@Override
public void onHalfClose() {
try (final Scope ignored = tracer.scopeManager().activate(span, false)) {
delegate().onHalfClose();
} catch (final RuntimeException | Error e) {
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, e));
span.finish();
throw e;
}
}
@Override
public void onCancel() {
// Finishes span.
try (final Scope ignored = tracer.scopeManager().activate(span, true)) {
delegate().onCancel();
span.setTag("canceled", true);
} catch (final RuntimeException | Error e) {
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, e));
span.finish();
throw e;
}
}
@Override
public void onComplete() {
// Finishes span.
try (final Scope ignored = tracer.scopeManager().activate(span, true)) {
delegate().onComplete();
} catch (final RuntimeException | Error e) {
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, e));
span.finish();
throw e;
}
}
@Override
public void onReady() {
try (final Scope ignored = tracer.scopeManager().activate(span, false)) {
delegate().onReady();
} catch (final RuntimeException | Error e) {
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, e));
span.finish();
throw e;
}
}
}
}

View File

@ -0,0 +1,174 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.DDTags
import example.GreeterGrpc
import example.Helloworld
import io.grpc.BindableService
import io.grpc.ManagedChannel
import io.grpc.Server
import io.grpc.inprocess.InProcessChannelBuilder
import io.grpc.inprocess.InProcessServerBuilder
import io.grpc.stub.StreamObserver
import io.opentracing.tag.Tags
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import static datadog.trace.agent.test.ListWriterAssert.assertTraces
class GrpcStreamingTest extends AgentTestRunner {
static {
System.setProperty("dd.integration.grpc.enabled", "true")
}
def "test conversation #name"() {
setup:
final msgCount = serverMessageCount
def serverReceived = new CopyOnWriteArrayList<>()
def clientReceived = new CopyOnWriteArrayList<>()
def error = new AtomicReference()
BindableService greeter = new GreeterGrpc.GreeterImplBase() {
@Override
StreamObserver<Helloworld.Response> conversation(StreamObserver<Helloworld.Response> observer) {
return new StreamObserver<Helloworld.Response>() {
@Override
void onNext(Helloworld.Response value) {
serverReceived << value.message
(1..msgCount).each {
observer.onNext(value)
}
}
@Override
void onError(Throwable t) {
error.set(t)
observer.onError(t)
}
@Override
void onCompleted() {
observer.onCompleted()
}
}
}
}
Server server = InProcessServerBuilder.forName(getClass().name).addService(greeter).directExecutor().build().start()
ManagedChannel channel = InProcessChannelBuilder.forName(getClass().name).build()
GreeterGrpc.GreeterStub client = GreeterGrpc.newStub(channel).withWaitForReady()
when:
def observer = client.conversation(new StreamObserver<Helloworld.Response>() {
@Override
void onNext(Helloworld.Response value) {
clientReceived << value.message
}
@Override
void onError(Throwable t) {
error.set(t)
}
@Override
void onCompleted() {
TEST_WRITER.waitForTraces(1)
}
})
clientRange.each {
def message = Helloworld.Response.newBuilder().setMessage("call $it").build()
observer.onNext(message)
}
observer.onCompleted()
then:
error.get() == null
assertTraces(TEST_WRITER, 2) {
trace(0, clientMessageCount + 1) {
span(0) {
serviceName "unnamed-java-app"
operationName "grpc.server"
resourceName "example.Greeter/Conversation"
spanType DDSpanTypes.RPC
childOf trace(1).get(0)
errored false
tags {
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER
"$DDTags.SPAN_TYPE" DDSpanTypes.RPC
defaultTags()
}
}
clientRange.each {
span(it) {
serviceName "unnamed-java-app"
operationName "grpc.message"
resourceName "grpc.message"
spanType DDSpanTypes.RPC
childOf span(0)
errored false
tags {
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER
"$DDTags.SPAN_TYPE" DDSpanTypes.RPC
"message.type" "example.Helloworld\$Response"
defaultTags()
}
}
}
}
trace(1, (clientMessageCount * serverMessageCount) + 1) {
span(0) {
serviceName "unnamed-java-app"
operationName "grpc.client"
resourceName "example.Greeter/Conversation"
spanType DDSpanTypes.RPC
parent()
errored false
tags {
"status.code" "OK"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$DDTags.SPAN_TYPE" DDSpanTypes.RPC
defaultTags()
}
}
(1..(clientMessageCount * serverMessageCount)).each {
span(it) {
serviceName "unnamed-java-app"
operationName "grpc.message"
resourceName "grpc.message"
spanType DDSpanTypes.RPC
childOf span(0)
errored false
tags {
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$DDTags.SPAN_TYPE" DDSpanTypes.RPC
"message.type" "example.Helloworld\$Response"
defaultTags()
}
}
}
}
}
serverReceived == clientRange.collect { "call $it" }
clientReceived == serverRange.collect { clientRange.collect { "call $it" } }.flatten().sort()
cleanup:
channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS)
server?.shutdownNow()?.awaitTermination()
where:
name | clientMessageCount | serverMessageCount
"A" | 1 | 1
"B" | 2 | 1
"C" | 1 | 2
"D" | 2 | 2
"E" | 3 | 3
clientRange = 1..clientMessageCount
serverRange = 1..serverMessageCount
}
}

View File

@ -0,0 +1,285 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.DDTags
import example.GreeterGrpc
import example.Helloworld
import io.grpc.BindableService
import io.grpc.ManagedChannel
import io.grpc.Server
import io.grpc.Status
import io.grpc.StatusRuntimeException
import io.grpc.inprocess.InProcessChannelBuilder
import io.grpc.inprocess.InProcessServerBuilder
import io.grpc.stub.StreamObserver
import io.opentracing.tag.Tags
import java.util.concurrent.TimeUnit
import static datadog.trace.agent.test.ListWriterAssert.assertTraces
class GrpcTest extends AgentTestRunner {
static {
System.setProperty("dd.integration.grpc.enabled", "true")
}
def "test request-response"() {
setup:
BindableService greeter = new GreeterGrpc.GreeterImplBase() {
@Override
void sayHello(
final Helloworld.Request req, final StreamObserver<Helloworld.Response> responseObserver) {
final Helloworld.Response reply = Helloworld.Response.newBuilder().setMessage("Hello $req.name").build()
responseObserver.onNext(reply)
responseObserver.onCompleted()
}
}
Server server = InProcessServerBuilder.forName(getClass().name).addService(greeter).directExecutor().build().start()
ManagedChannel channel = InProcessChannelBuilder.forName(getClass().name).build()
GreeterGrpc.GreeterBlockingStub client = GreeterGrpc.newBlockingStub(channel)
when:
def response = client.sayHello(Helloworld.Request.newBuilder().setName(name).build())
then:
response.message == "Hello $name"
assertTraces(TEST_WRITER, 2) {
trace(0, 2) {
span(0) {
serviceName "unnamed-java-app"
operationName "grpc.server"
resourceName "example.Greeter/SayHello"
spanType DDSpanTypes.RPC
childOf trace(1).get(0)
errored false
tags {
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER
"$DDTags.SPAN_TYPE" DDSpanTypes.RPC
defaultTags()
}
}
span(1) {
serviceName "unnamed-java-app"
operationName "grpc.message"
resourceName "grpc.message"
spanType DDSpanTypes.RPC
childOf span(0)
errored false
tags {
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER
"$DDTags.SPAN_TYPE" DDSpanTypes.RPC
"message.type" "example.Helloworld\$Request"
defaultTags()
}
}
}
trace(1, 2) {
span(0) {
serviceName "unnamed-java-app"
operationName "grpc.client"
resourceName "example.Greeter/SayHello"
spanType DDSpanTypes.RPC
parent()
errored false
tags {
"status.code" "OK"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$DDTags.SPAN_TYPE" DDSpanTypes.RPC
defaultTags()
}
}
span(1) {
serviceName "unnamed-java-app"
operationName "grpc.message"
resourceName "grpc.message"
spanType DDSpanTypes.RPC
childOf span(0)
errored false
tags {
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$DDTags.SPAN_TYPE" DDSpanTypes.RPC
"message.type" "example.Helloworld\$Response"
defaultTags()
}
}
}
}
cleanup:
channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS)
server?.shutdownNow()?.awaitTermination()
where:
name << ["some name", "some other name"]
}
def "test error - #name"() {
setup:
def error = status.asException()
BindableService greeter = new GreeterGrpc.GreeterImplBase() {
@Override
void sayHello(
final Helloworld.Request req, final StreamObserver<Helloworld.Response> responseObserver) {
responseObserver.onError(error)
}
}
Server server = InProcessServerBuilder.forName(getClass().name).addService(greeter).directExecutor().build().start()
ManagedChannel channel = InProcessChannelBuilder.forName(getClass().name).build()
GreeterGrpc.GreeterBlockingStub client = GreeterGrpc.newBlockingStub(channel)
when:
client.sayHello(Helloworld.Request.newBuilder().setName(name).build())
then:
thrown StatusRuntimeException
assertTraces(TEST_WRITER, 2) {
trace(0, 2) {
span(0) {
serviceName "unnamed-java-app"
operationName "grpc.server"
resourceName "example.Greeter/SayHello"
spanType DDSpanTypes.RPC
childOf trace(1).get(0)
errored false
tags {
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER
"$DDTags.SPAN_TYPE" DDSpanTypes.RPC
defaultTags()
}
}
span(1) {
serviceName "unnamed-java-app"
operationName "grpc.message"
resourceName "grpc.message"
spanType DDSpanTypes.RPC
childOf span(0)
errored false
tags {
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER
"$DDTags.SPAN_TYPE" DDSpanTypes.RPC
"message.type" "example.Helloworld\$Request"
defaultTags()
}
}
}
trace(1, 1) {
span(0) {
serviceName "unnamed-java-app"
operationName "grpc.client"
resourceName "example.Greeter/SayHello"
spanType DDSpanTypes.RPC
parent()
errored true
tags {
"status.code" "${status.code.name()}"
"status.description" description
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$DDTags.SPAN_TYPE" DDSpanTypes.RPC
tag "error", true
defaultTags()
}
}
}
}
cleanup:
channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS)
server?.shutdownNow()?.awaitTermination()
where:
name | status | description
"Runtime - cause" | Status.UNKNOWN.withCause(new RuntimeException("some error")) | null
"Status - cause" | Status.PERMISSION_DENIED.withCause(new RuntimeException("some error")) | null
"StatusRuntime - cause" | Status.UNIMPLEMENTED.withCause(new RuntimeException("some error")) | null
"Runtime - description" | Status.UNKNOWN.withDescription("some description") | "some description"
"Status - description" | Status.PERMISSION_DENIED.withDescription("some description") | "some description"
"StatusRuntime - description" | Status.UNIMPLEMENTED.withDescription("some description") | "some description"
}
def "test error thrown - #name"() {
setup:
def error = status.asRuntimeException()
BindableService greeter = new GreeterGrpc.GreeterImplBase() {
@Override
void sayHello(
final Helloworld.Request req, final StreamObserver<Helloworld.Response> responseObserver) {
throw error
}
}
Server server = InProcessServerBuilder.forName(getClass().name).addService(greeter).directExecutor().build().start()
ManagedChannel channel = InProcessChannelBuilder.forName(getClass().name).build()
GreeterGrpc.GreeterBlockingStub client = GreeterGrpc.newBlockingStub(channel)
when:
client.sayHello(Helloworld.Request.newBuilder().setName(name).build())
then:
thrown StatusRuntimeException
assertTraces(TEST_WRITER, 2) {
trace(0, 2) {
span(0) {
serviceName "unnamed-java-app"
operationName "grpc.server"
resourceName "example.Greeter/SayHello"
spanType DDSpanTypes.RPC
childOf trace(1).get(0)
errored true
tags {
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER
"$DDTags.SPAN_TYPE" DDSpanTypes.RPC
errorTags error.class, error.message
defaultTags()
}
}
span(1) {
serviceName "unnamed-java-app"
operationName "grpc.message"
resourceName "grpc.message"
spanType DDSpanTypes.RPC
childOf span(0)
errored false
tags {
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER
"$DDTags.SPAN_TYPE" DDSpanTypes.RPC
"message.type" "example.Helloworld\$Request"
defaultTags()
}
}
}
trace(1, 1) {
span(0) {
serviceName "unnamed-java-app"
operationName "grpc.client"
resourceName "example.Greeter/SayHello"
spanType DDSpanTypes.RPC
parent()
errored true
tags {
"status.code" "UNKNOWN"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$DDTags.SPAN_TYPE" DDSpanTypes.RPC
tag "error", true
defaultTags()
}
}
}
}
cleanup:
channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS)
server?.shutdownNow()?.awaitTermination()
where:
name | status
"Runtime - cause" | Status.UNKNOWN.withCause(new RuntimeException("some error"))
"Status - cause" | Status.PERMISSION_DENIED.withCause(new RuntimeException("some error"))
"StatusRuntime - cause" | Status.UNIMPLEMENTED.withCause(new RuntimeException("some error"))
"Runtime - description" | Status.UNKNOWN.withDescription("some description")
"Status - description" | Status.PERMISSION_DENIED.withDescription("some description")
"StatusRuntime - description" | Status.UNIMPLEMENTED.withDescription("some description")
}
}

View File

@ -0,0 +1,33 @@
package util
import io.grpc.CallOptions
import io.grpc.Channel
import io.grpc.ClientCall
import io.grpc.ClientInterceptor
import io.grpc.ForwardingClientCall
import io.grpc.Metadata
import io.grpc.MethodDescriptor
import java.util.concurrent.Phaser
/**
* Interceptor that blocks client from returning until server trace is reported.
*/
class BlockingInterceptor implements ClientInterceptor {
private final Phaser phaser
BlockingInterceptor(Phaser phaser) {
this.phaser = phaser
phaser.register()
}
@Override
<ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall(next.newCall(method, callOptions)) {
@Override
void start(final ClientCall.Listener responseListener, final Metadata headers) {
super.start(new BlockingListener(responseListener, phaser), headers)
}
}
}
}

View File

@ -0,0 +1,23 @@
package util
import io.grpc.ClientCall
import io.grpc.ForwardingClientCallListener
import io.grpc.Metadata
import io.grpc.Status
import java.util.concurrent.Phaser
class BlockingListener<RespT> extends ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT> {
private final Phaser phaser
BlockingListener(ClientCall.Listener<RespT> delegate, Phaser phaser) {
super(delegate)
this.phaser = phaser
}
@Override
void onClose(final Status status, final Metadata trailers) {
delegate().onClose(status, trailers)
phaser.arriveAndAwaitAdvance()
}
}

View File

@ -0,0 +1,19 @@
syntax = "proto3";
package example;
service Greeter {
rpc SayHello (Request) returns (Response) {
}
rpc Conversation (stream Response) returns (stream Response) {
}
}
message Request {
string name = 1;
}
message Response {
string message = 1;
}

View File

@ -1,5 +1,6 @@
package datadog.trace.agent.test
import datadog.opentracing.DDSpan
import datadog.trace.common.writer.ListWriter
import org.codehaus.groovy.runtime.powerassert.PowerAssertionError
import org.spockframework.runtime.Condition
@ -13,7 +14,7 @@ class ListWriterAssert {
private final int size
private final Set<Integer> assertedIndexes = new HashSet<>()
private ListWriterAssert(writer) {
private ListWriterAssert(ListWriter writer) {
this.writer = writer
size = writer.size()
}
@ -49,6 +50,10 @@ class ListWriterAssert {
}
}
List<DDSpan> trace(int index) {
return writer.get(index)
}
void trace(int index, int expectedSize,
@DelegatesTo(value = TraceAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) {
if (index >= size) {

View File

@ -44,6 +44,9 @@ class TagsAssert {
}
def tag(String name, value) {
if (value == null) {
return
}
assertedTags.add(name)
if (value instanceof Class) {
assert ((Class) value).isInstance(tags[name])

View File

@ -77,13 +77,12 @@ public abstract class AgentTestRunner extends Specification {
((Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME)).setLevel(Level.WARN);
((Logger) LoggerFactory.getLogger("datadog")).setLevel(Level.DEBUG);
WRITER_PHASER.register();
TEST_WRITER =
new ListWriter() {
@Override
public boolean add(final List<DDSpan> trace) {
final boolean result = super.add(trace);
WRITER_PHASER.arrive();
WRITER_PHASER.arriveAndDeregister();
return result;
}
};
@ -137,6 +136,7 @@ public abstract class AgentTestRunner extends Specification {
@Before
public void beforeTest() {
TEST_WRITER.start();
WRITER_PHASER.register();
INSTRUMENTATION_ERROR_COUNT.set(0);
ERROR_LISTENER.activateTest(this);
assert getTestTracer().activeSpan() == null;
@ -159,11 +159,11 @@ public abstract class AgentTestRunner extends Specification {
public static class ErrorCountingListener implements AgentBuilder.Listener {
private static final List<AgentTestRunner> activeTests = new CopyOnWriteArrayList<>();
public void activateTest(AgentTestRunner testRunner) {
public void activateTest(final AgentTestRunner testRunner) {
activeTests.add(testRunner);
}
public void deactivateTest(AgentTestRunner testRunner) {
public void deactivateTest(final AgentTestRunner testRunner) {
activeTests.remove(testRunner);
}
@ -198,7 +198,7 @@ public abstract class AgentTestRunner extends Specification {
final JavaModule module,
final boolean loaded,
final Throwable throwable) {
for (AgentTestRunner testRunner : activeTests) {
for (final AgentTestRunner testRunner : activeTests) {
if (testRunner.onInstrumentationError(typeName, classLoader, module, loaded, throwable)) {
INSTRUMENTATION_ERROR_COUNT.incrementAndGet();
break;

View File

@ -3,6 +3,7 @@ package datadog.trace.api;
public class DDSpanTypes {
public static final String HTTP_CLIENT = "http";
public static final String WEB_SERVLET = "web";
public static final String RPC = "rpc";
public static final String SQL = "sql";
public static final String MONGO = "mongodb";

View File

@ -16,6 +16,7 @@ tasks.withType(JavaCompile) {
options.compilerArgs += ['-Xep:FutureReturnValueIgnored:OFF']
// workaround for: https://github.com/google/error-prone/issues/780
options.compilerArgs += ['-Xep:ParameterName:OFF']
options.compilerArgs += ['-XepDisableWarningsInGeneratedCode']
}
apply plugin: "eclipse"

View File

@ -18,6 +18,7 @@ include ':dd-java-agent:instrumentation:elasticsearch-rest-5'
include ':dd-java-agent:instrumentation:elasticsearch-transport-2'
include ':dd-java-agent:instrumentation:elasticsearch-transport-5'
include ':dd-java-agent:instrumentation:elasticsearch-transport-6'
include ':dd-java-agent:instrumentation:grpc-1.5'
include ':dd-java-agent:instrumentation:http-url-connection'
include ':dd-java-agent:instrumentation:hystrix-1.4'
include ':dd-java-agent:instrumentation:jax-rs-annotations'