diff --git a/dd-java-agent/instrumentation/grpc-1.5/grpc-1.5.gradle b/dd-java-agent/instrumentation/grpc-1.5/grpc-1.5.gradle new file mode 100644 index 0000000000..1aa2301534 --- /dev/null +++ b/dd-java-agent/instrumentation/grpc-1.5/grpc-1.5.gradle @@ -0,0 +1,74 @@ +apply plugin: 'version-scan' + +versionScan { + group = "io.grpc" + module = "grpc-core" + versions = "[1.5.0,)" + verifyPresent = [ + "io.grpc.InternalServerInterceptors": null, + ] +} + +buildscript { + repositories { + mavenCentral() + } + dependencies { + classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.6' + } +} + +apply from: "${rootDir}/gradle/java.gradle" +apply plugin: 'idea' +apply plugin: 'com.google.protobuf' + +def grpcVersion = '1.5.0' +protobuf { + protoc { + // Download compiler rather than using locally installed version: + artifact = 'com.google.protobuf:protoc:3.3.0' + } + plugins { + grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" } + } + generateProtoTasks { + all()*.plugins { grpc {} } + } +} + +apply plugin: 'org.unbroken-dome.test-sets' + +testSets { + latestDepTest { + dirName = 'test' + } +} + +dependencies { + compileOnly group: 'io.grpc', name: 'grpc-core', version: grpcVersion + + compile project(':dd-trace-ot') + compile project(':dd-java-agent:agent-tooling') + + compile deps.bytebuddy + compile deps.opentracing + compile deps.autoservice + annotationProcessor deps.autoservice + implementation deps.autoservice + + testCompile project(':dd-java-agent:testing') + + testCompile group: 'io.grpc', name: 'grpc-netty', version: grpcVersion + testCompile group: 'io.grpc', name: 'grpc-protobuf', version: grpcVersion + testCompile group: 'io.grpc', name: 'grpc-stub', version: grpcVersion + + latestDepTestCompile sourceSets.test.output // include the protobuf generated classes +} + +configurations.latestDepTestCompile { + resolutionStrategy { + force group: 'io.grpc', name: 'grpc-netty', version: '+' + force group: 'io.grpc', name: 'grpc-protobuf', version: '+' + force group: 'io.grpc', name: 'grpc-stub', version: '+' + } +} diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/GrpcClientBuilderInstrumentation.java b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/GrpcClientBuilderInstrumentation.java new file mode 100644 index 0000000000..1185e6eb22 --- /dev/null +++ b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/GrpcClientBuilderInstrumentation.java @@ -0,0 +1,72 @@ +package datadog.trace.instrumentation.grpc.client; + +import static datadog.trace.agent.tooling.ClassLoaderMatcher.classLoaderHasClasses; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.named; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import io.grpc.ClientInterceptor; +import io.opentracing.util.GlobalTracer; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.matcher.ElementMatcher; + +@AutoService(Instrumenter.class) +public class GrpcClientBuilderInstrumentation extends Instrumenter.Default { + + public GrpcClientBuilderInstrumentation() { + super("grpc", "grpc-client"); + } + + @Override + public ElementMatcher typeMatcher() { + return named("io.grpc.internal.AbstractManagedChannelImplBuilder"); + } + + @Override + public ElementMatcher classLoaderMatcher() { + return classLoaderHasClasses("io.grpc.InternalServerInterceptors"); + } + + @Override + public String[] helperClassNames() { + return new String[] { + "datadog.trace.instrumentation.grpc.client.GrpcInjectAdapter", + "datadog.trace.instrumentation.grpc.client.TracingClientInterceptor", + "datadog.trace.instrumentation.grpc.client.TracingClientInterceptor$TracingClientCall", + "datadog.trace.instrumentation.grpc.client.TracingClientInterceptor$TracingClientCallListener", + }; + } + + @Override + public Map transformers() { + return Collections.singletonMap( + isMethod().and(named("build")), AddInterceptorAdvice.class.getName()); + } + + @Override + protected boolean defaultEnabled() { + return false; + } + + public static class AddInterceptorAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void addInterceptor( + @Advice.FieldValue("interceptors") final List interceptors) { + boolean shouldRegister = true; + for (final ClientInterceptor interceptor : interceptors) { + if (interceptor instanceof TracingClientInterceptor) { + shouldRegister = false; + break; + } + } + if (shouldRegister) { + interceptors.add(0, new TracingClientInterceptor(GlobalTracer.get())); + } + } + } +} diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/GrpcInjectAdapter.java b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/GrpcInjectAdapter.java new file mode 100644 index 0000000000..1e4bf7434f --- /dev/null +++ b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/GrpcInjectAdapter.java @@ -0,0 +1,25 @@ +package datadog.trace.instrumentation.grpc.client; + +import io.grpc.Metadata; +import io.opentracing.propagation.TextMap; +import java.util.Iterator; +import java.util.Map; + +public final class GrpcInjectAdapter implements TextMap { + private final Metadata metadata; + + public GrpcInjectAdapter(final Metadata metadata) { + this.metadata = metadata; + } + + @Override + public Iterator> iterator() { + throw new UnsupportedOperationException( + "GrpcInjectAdapter should only be used with Tracer.inject()"); + } + + @Override + public void put(final String key, final String value) { + this.metadata.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value); + } +} diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/TracingClientInterceptor.java b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/TracingClientInterceptor.java new file mode 100644 index 0000000000..b3ea2c083c --- /dev/null +++ b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/TracingClientInterceptor.java @@ -0,0 +1,171 @@ +package datadog.trace.instrumentation.grpc.client; + +import static io.opentracing.log.Fields.ERROR_OBJECT; + +import datadog.trace.api.DDSpanTypes; +import datadog.trace.api.DDTags; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall; +import io.grpc.ForwardingClientCallListener; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.Tracer; +import io.opentracing.propagation.Format; +import io.opentracing.tag.Tags; +import java.util.Collections; + +public class TracingClientInterceptor implements ClientInterceptor { + + private final Tracer tracer; + + public TracingClientInterceptor(final Tracer tracer) { + this.tracer = tracer; + } + + @Override + public ClientCall interceptCall( + final MethodDescriptor method, + final CallOptions callOptions, + final Channel next) { + + final Scope scope = + tracer + .buildSpan("grpc.client") + .withTag(DDTags.RESOURCE_NAME, method.getFullMethodName()) + .withTag(DDTags.SPAN_TYPE, DDSpanTypes.RPC) + .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT) + .startActive(false); + final Span span = scope.span(); + + final ClientCall result; + try { + // call other interceptors + result = next.newCall(method, callOptions); + } catch (final RuntimeException | Error e) { + Tags.ERROR.set(span, true); + span.log(Collections.singletonMap(ERROR_OBJECT, e)); + span.finish(); + throw e; + } finally { + scope.close(); + } + + return new TracingClientCall<>(tracer, span, result); + } + + static final class TracingClientCall + extends ForwardingClientCall.SimpleForwardingClientCall { + final Tracer tracer; + final Span span; + + TracingClientCall( + final Tracer tracer, final Span span, final ClientCall delegate) { + super(delegate); + this.tracer = tracer; + this.span = span; + } + + @Override + public void start(final Listener responseListener, final Metadata headers) { + tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new GrpcInjectAdapter(headers)); + + try (final Scope ignored = tracer.scopeManager().activate(span, false)) { + super.start(new TracingClientCallListener<>(tracer, span, responseListener), headers); + } catch (final RuntimeException | Error e) { + Tags.ERROR.set(span, true); + span.log(Collections.singletonMap(ERROR_OBJECT, e)); + span.finish(); + throw e; + } + } + + @Override + public void sendMessage(final ReqT message) { + try (final Scope ignored = tracer.scopeManager().activate(span, false)) { + super.sendMessage(message); + } catch (final RuntimeException | Error e) { + Tags.ERROR.set(span, true); + span.log(Collections.singletonMap(ERROR_OBJECT, e)); + span.finish(); + throw e; + } + } + } + + static final class TracingClientCallListener + extends ForwardingClientCallListener.SimpleForwardingClientCallListener { + final Tracer tracer; + final Span span; + + TracingClientCallListener( + final Tracer tracer, final Span span, final ClientCall.Listener delegate) { + super(delegate); + this.tracer = tracer; + this.span = span; + } + + @Override + public void onMessage(final RespT message) { + final Scope scope = + tracer + .buildSpan("grpc.message") + .asChildOf(span) + .withTag("message.type", message.getClass().getName()) + .withTag(DDTags.SPAN_TYPE, DDSpanTypes.RPC) + .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT) + .startActive(true); + try { + delegate().onMessage(message); + } catch (final RuntimeException | Error e) { + final Span span = scope.span(); + Tags.ERROR.set(span, true); + this.span.log(Collections.singletonMap(ERROR_OBJECT, e)); + this.span.finish(); + throw e; + } finally { + scope.close(); + } + } + + @Override + public void onClose(final Status status, final Metadata trailers) { + span.setTag("status.code", status.getCode().name()); + if (status.getDescription() != null) { + span.setTag("status.description", status.getDescription()); + } + if (!status.isOk()) { + Tags.ERROR.set(span, true); + } + if (status.getCause() != null) { + span.log(Collections.singletonMap(ERROR_OBJECT, status.getCause())); + } + // Finishes span. + try (final Scope ignored = tracer.scopeManager().activate(span, true)) { + delegate().onClose(status, trailers); + } catch (final RuntimeException | Error e) { + Tags.ERROR.set(span, true); + span.log(Collections.singletonMap(ERROR_OBJECT, e)); + span.finish(); + throw e; + } + } + + @Override + public void onReady() { + try (final Scope ignored = tracer.scopeManager().activate(span, false)) { + delegate().onReady(); + } catch (final RuntimeException | Error e) { + Tags.ERROR.set(span, true); + span.log(Collections.singletonMap(ERROR_OBJECT, e)); + span.finish(); + throw e; + } + } + } +} diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/GrpcServerBuilderInstrumentation.java b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/GrpcServerBuilderInstrumentation.java new file mode 100644 index 0000000000..37ff1994c9 --- /dev/null +++ b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/GrpcServerBuilderInstrumentation.java @@ -0,0 +1,70 @@ +package datadog.trace.instrumentation.grpc.server; + +import static datadog.trace.agent.tooling.ClassLoaderMatcher.classLoaderHasClasses; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.named; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import io.grpc.ServerInterceptor; +import io.opentracing.util.GlobalTracer; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.matcher.ElementMatcher; + +@AutoService(Instrumenter.class) +public class GrpcServerBuilderInstrumentation extends Instrumenter.Default { + + public GrpcServerBuilderInstrumentation() { + super("grpc", "grpc-server"); + } + + @Override + public ElementMatcher typeMatcher() { + return named("io.grpc.internal.AbstractServerImplBuilder"); + } + + @Override + public ElementMatcher classLoaderMatcher() { + return classLoaderHasClasses("io.grpc.InternalServerInterceptors"); + } + + @Override + public String[] helperClassNames() { + return new String[] { + "datadog.trace.instrumentation.grpc.server.TracingServerInterceptor", + "datadog.trace.instrumentation.grpc.server.TracingServerInterceptor$TracingServerCallListener", + }; + } + + @Override + public Map transformers() { + return Collections.singletonMap( + isMethod().and(named("build")), AddInterceptorAdvice.class.getName()); + } + + @Override + protected boolean defaultEnabled() { + return false; + } + + public static class AddInterceptorAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void addInterceptor( + @Advice.FieldValue("interceptors") final List interceptors) { + boolean shouldRegister = true; + for (final ServerInterceptor interceptor : interceptors) { + if (interceptor instanceof TracingServerInterceptor) { + shouldRegister = false; + break; + } + } + if (shouldRegister) { + interceptors.add(0, new TracingServerInterceptor(GlobalTracer.get())); + } + } + } +} diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/TracingServerInterceptor.java b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/TracingServerInterceptor.java new file mode 100644 index 0000000000..bc70102f3b --- /dev/null +++ b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/TracingServerInterceptor.java @@ -0,0 +1,162 @@ +package datadog.trace.instrumentation.grpc.server; + +import static io.opentracing.log.Fields.ERROR_OBJECT; + +import datadog.trace.api.DDSpanTypes; +import datadog.trace.api.DDTags; +import io.grpc.ForwardingServerCallListener; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.SpanContext; +import io.opentracing.Tracer; +import io.opentracing.propagation.Format; +import io.opentracing.propagation.TextMapExtractAdapter; +import io.opentracing.tag.Tags; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class TracingServerInterceptor implements ServerInterceptor { + + private final Tracer tracer; + + public TracingServerInterceptor(final Tracer tracer) { + this.tracer = tracer; + } + + @Override + public ServerCall.Listener interceptCall( + final ServerCall call, + final Metadata headers, + final ServerCallHandler next) { + + final Map headerMap = new HashMap<>(); + for (final String key : headers.keys()) { + if (!key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) { + final String value = headers.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)); + headerMap.put(key, value); + } + } + final SpanContext spanContext = + tracer.extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(headerMap)); + + final Tracer.SpanBuilder spanBuilder = + tracer + .buildSpan("grpc.server") + .withTag(DDTags.RESOURCE_NAME, call.getMethodDescriptor().getFullMethodName()) + .withTag(DDTags.SPAN_TYPE, DDSpanTypes.RPC) + .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER); + if (spanContext != null) { + spanBuilder.asChildOf(spanContext); + } + final Scope scope = spanBuilder.startActive(false); + final Span span = scope.span(); + + final ServerCall.Listener result; + try { + // call other interceptors + result = next.startCall(call, headers); + } catch (final RuntimeException | Error e) { + Tags.ERROR.set(span, true); + span.log(Collections.singletonMap(ERROR_OBJECT, e)); + span.finish(); + throw e; + } finally { + scope.close(); + } + + // This ensures the server implementation can see the span in scope + return new TracingServerCallListener<>(tracer, span, result); + } + + static final class TracingServerCallListener + extends ForwardingServerCallListener.SimpleForwardingServerCallListener { + final Tracer tracer; + final Span span; + + TracingServerCallListener( + final Tracer tracer, final Span span, final ServerCall.Listener delegate) { + super(delegate); + this.tracer = tracer; + this.span = span; + } + + @Override + public void onMessage(final ReqT message) { + final Scope scope = + tracer + .buildSpan("grpc.message") + .asChildOf(span) + .withTag("message.type", message.getClass().getName()) + .withTag(DDTags.SPAN_TYPE, DDSpanTypes.RPC) + .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER) + .startActive(true); + try { + delegate().onMessage(message); + } catch (final RuntimeException | Error e) { + final Span span = scope.span(); + Tags.ERROR.set(span, true); + this.span.log(Collections.singletonMap(ERROR_OBJECT, e)); + this.span.finish(); + throw e; + } finally { + scope.close(); + } + } + + @Override + public void onHalfClose() { + try (final Scope ignored = tracer.scopeManager().activate(span, false)) { + delegate().onHalfClose(); + } catch (final RuntimeException | Error e) { + Tags.ERROR.set(span, true); + span.log(Collections.singletonMap(ERROR_OBJECT, e)); + span.finish(); + throw e; + } + } + + @Override + public void onCancel() { + // Finishes span. + try (final Scope ignored = tracer.scopeManager().activate(span, true)) { + delegate().onCancel(); + span.setTag("canceled", true); + } catch (final RuntimeException | Error e) { + Tags.ERROR.set(span, true); + span.log(Collections.singletonMap(ERROR_OBJECT, e)); + span.finish(); + throw e; + } + } + + @Override + public void onComplete() { + // Finishes span. + try (final Scope ignored = tracer.scopeManager().activate(span, true)) { + delegate().onComplete(); + } catch (final RuntimeException | Error e) { + Tags.ERROR.set(span, true); + span.log(Collections.singletonMap(ERROR_OBJECT, e)); + span.finish(); + throw e; + } + } + + @Override + public void onReady() { + try (final Scope ignored = tracer.scopeManager().activate(span, false)) { + delegate().onReady(); + } catch (final RuntimeException | Error e) { + Tags.ERROR.set(span, true); + span.log(Collections.singletonMap(ERROR_OBJECT, e)); + span.finish(); + throw e; + } + } + } +} diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy new file mode 100644 index 0000000000..5530d7d1de --- /dev/null +++ b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy @@ -0,0 +1,174 @@ +import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.api.DDSpanTypes +import datadog.trace.api.DDTags +import example.GreeterGrpc +import example.Helloworld +import io.grpc.BindableService +import io.grpc.ManagedChannel +import io.grpc.Server +import io.grpc.inprocess.InProcessChannelBuilder +import io.grpc.inprocess.InProcessServerBuilder +import io.grpc.stub.StreamObserver +import io.opentracing.tag.Tags + +import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicReference + +import static datadog.trace.agent.test.ListWriterAssert.assertTraces + +class GrpcStreamingTest extends AgentTestRunner { + static { + System.setProperty("dd.integration.grpc.enabled", "true") + } + + def "test conversation #name"() { + setup: + final msgCount = serverMessageCount + def serverReceived = new CopyOnWriteArrayList<>() + def clientReceived = new CopyOnWriteArrayList<>() + def error = new AtomicReference() + + BindableService greeter = new GreeterGrpc.GreeterImplBase() { + @Override + StreamObserver conversation(StreamObserver observer) { + return new StreamObserver() { + @Override + void onNext(Helloworld.Response value) { + serverReceived << value.message + + (1..msgCount).each { + observer.onNext(value) + } + } + + @Override + void onError(Throwable t) { + error.set(t) + observer.onError(t) + } + + @Override + void onCompleted() { + observer.onCompleted() + } + } + } + } + Server server = InProcessServerBuilder.forName(getClass().name).addService(greeter).directExecutor().build().start() + + ManagedChannel channel = InProcessChannelBuilder.forName(getClass().name).build() + GreeterGrpc.GreeterStub client = GreeterGrpc.newStub(channel).withWaitForReady() + + when: + def observer = client.conversation(new StreamObserver() { + @Override + void onNext(Helloworld.Response value) { + clientReceived << value.message + } + + @Override + void onError(Throwable t) { + error.set(t) + } + + @Override + void onCompleted() { + TEST_WRITER.waitForTraces(1) + } + }) + + clientRange.each { + def message = Helloworld.Response.newBuilder().setMessage("call $it").build() + observer.onNext(message) + } + observer.onCompleted() + + then: + error.get() == null + + assertTraces(TEST_WRITER, 2) { + trace(0, clientMessageCount + 1) { + span(0) { + serviceName "unnamed-java-app" + operationName "grpc.server" + resourceName "example.Greeter/Conversation" + spanType DDSpanTypes.RPC + childOf trace(1).get(0) + errored false + tags { + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER + "$DDTags.SPAN_TYPE" DDSpanTypes.RPC + defaultTags() + } + } + clientRange.each { + span(it) { + serviceName "unnamed-java-app" + operationName "grpc.message" + resourceName "grpc.message" + spanType DDSpanTypes.RPC + childOf span(0) + errored false + tags { + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER + "$DDTags.SPAN_TYPE" DDSpanTypes.RPC + "message.type" "example.Helloworld\$Response" + defaultTags() + } + } + } + } + trace(1, (clientMessageCount * serverMessageCount) + 1) { + span(0) { + serviceName "unnamed-java-app" + operationName "grpc.client" + resourceName "example.Greeter/Conversation" + spanType DDSpanTypes.RPC + parent() + errored false + tags { + "status.code" "OK" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "$DDTags.SPAN_TYPE" DDSpanTypes.RPC + defaultTags() + } + } + (1..(clientMessageCount * serverMessageCount)).each { + span(it) { + serviceName "unnamed-java-app" + operationName "grpc.message" + resourceName "grpc.message" + spanType DDSpanTypes.RPC + childOf span(0) + errored false + tags { + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "$DDTags.SPAN_TYPE" DDSpanTypes.RPC + "message.type" "example.Helloworld\$Response" + defaultTags() + } + } + } + } + } + + serverReceived == clientRange.collect { "call $it" } + clientReceived == serverRange.collect { clientRange.collect { "call $it" } }.flatten().sort() + + cleanup: + channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS) + server?.shutdownNow()?.awaitTermination() + + where: + name | clientMessageCount | serverMessageCount + "A" | 1 | 1 + "B" | 2 | 1 + "C" | 1 | 2 + "D" | 2 | 2 + "E" | 3 | 3 + + clientRange = 1..clientMessageCount + serverRange = 1..serverMessageCount + } +} diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcTest.groovy b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcTest.groovy new file mode 100644 index 0000000000..14cbee01c7 --- /dev/null +++ b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcTest.groovy @@ -0,0 +1,285 @@ +import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.api.DDSpanTypes +import datadog.trace.api.DDTags +import example.GreeterGrpc +import example.Helloworld +import io.grpc.BindableService +import io.grpc.ManagedChannel +import io.grpc.Server +import io.grpc.Status +import io.grpc.StatusRuntimeException +import io.grpc.inprocess.InProcessChannelBuilder +import io.grpc.inprocess.InProcessServerBuilder +import io.grpc.stub.StreamObserver +import io.opentracing.tag.Tags + +import java.util.concurrent.TimeUnit + +import static datadog.trace.agent.test.ListWriterAssert.assertTraces + +class GrpcTest extends AgentTestRunner { + static { + System.setProperty("dd.integration.grpc.enabled", "true") + } + + def "test request-response"() { + setup: + BindableService greeter = new GreeterGrpc.GreeterImplBase() { + @Override + void sayHello( + final Helloworld.Request req, final StreamObserver responseObserver) { + final Helloworld.Response reply = Helloworld.Response.newBuilder().setMessage("Hello $req.name").build() + responseObserver.onNext(reply) + responseObserver.onCompleted() + } + } + Server server = InProcessServerBuilder.forName(getClass().name).addService(greeter).directExecutor().build().start() + + ManagedChannel channel = InProcessChannelBuilder.forName(getClass().name).build() + GreeterGrpc.GreeterBlockingStub client = GreeterGrpc.newBlockingStub(channel) + + when: + def response = client.sayHello(Helloworld.Request.newBuilder().setName(name).build()) + + then: + response.message == "Hello $name" + assertTraces(TEST_WRITER, 2) { + trace(0, 2) { + span(0) { + serviceName "unnamed-java-app" + operationName "grpc.server" + resourceName "example.Greeter/SayHello" + spanType DDSpanTypes.RPC + childOf trace(1).get(0) + errored false + tags { + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER + "$DDTags.SPAN_TYPE" DDSpanTypes.RPC + defaultTags() + } + } + span(1) { + serviceName "unnamed-java-app" + operationName "grpc.message" + resourceName "grpc.message" + spanType DDSpanTypes.RPC + childOf span(0) + errored false + tags { + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER + "$DDTags.SPAN_TYPE" DDSpanTypes.RPC + "message.type" "example.Helloworld\$Request" + defaultTags() + } + } + } + trace(1, 2) { + span(0) { + serviceName "unnamed-java-app" + operationName "grpc.client" + resourceName "example.Greeter/SayHello" + spanType DDSpanTypes.RPC + parent() + errored false + tags { + "status.code" "OK" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "$DDTags.SPAN_TYPE" DDSpanTypes.RPC + defaultTags() + } + } + span(1) { + serviceName "unnamed-java-app" + operationName "grpc.message" + resourceName "grpc.message" + spanType DDSpanTypes.RPC + childOf span(0) + errored false + tags { + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "$DDTags.SPAN_TYPE" DDSpanTypes.RPC + "message.type" "example.Helloworld\$Response" + defaultTags() + } + } + } + } + + cleanup: + channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS) + server?.shutdownNow()?.awaitTermination() + + where: + name << ["some name", "some other name"] + } + + def "test error - #name"() { + setup: + def error = status.asException() + BindableService greeter = new GreeterGrpc.GreeterImplBase() { + @Override + void sayHello( + final Helloworld.Request req, final StreamObserver responseObserver) { + responseObserver.onError(error) + } + } + Server server = InProcessServerBuilder.forName(getClass().name).addService(greeter).directExecutor().build().start() + + ManagedChannel channel = InProcessChannelBuilder.forName(getClass().name).build() + GreeterGrpc.GreeterBlockingStub client = GreeterGrpc.newBlockingStub(channel) + + when: + client.sayHello(Helloworld.Request.newBuilder().setName(name).build()) + + then: + thrown StatusRuntimeException + + assertTraces(TEST_WRITER, 2) { + trace(0, 2) { + span(0) { + serviceName "unnamed-java-app" + operationName "grpc.server" + resourceName "example.Greeter/SayHello" + spanType DDSpanTypes.RPC + childOf trace(1).get(0) + errored false + tags { + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER + "$DDTags.SPAN_TYPE" DDSpanTypes.RPC + defaultTags() + } + } + span(1) { + serviceName "unnamed-java-app" + operationName "grpc.message" + resourceName "grpc.message" + spanType DDSpanTypes.RPC + childOf span(0) + errored false + tags { + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER + "$DDTags.SPAN_TYPE" DDSpanTypes.RPC + "message.type" "example.Helloworld\$Request" + defaultTags() + } + } + } + trace(1, 1) { + span(0) { + serviceName "unnamed-java-app" + operationName "grpc.client" + resourceName "example.Greeter/SayHello" + spanType DDSpanTypes.RPC + parent() + errored true + tags { + "status.code" "${status.code.name()}" + "status.description" description + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "$DDTags.SPAN_TYPE" DDSpanTypes.RPC + tag "error", true + defaultTags() + } + } + } + } + + cleanup: + channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS) + server?.shutdownNow()?.awaitTermination() + + where: + name | status | description + "Runtime - cause" | Status.UNKNOWN.withCause(new RuntimeException("some error")) | null + "Status - cause" | Status.PERMISSION_DENIED.withCause(new RuntimeException("some error")) | null + "StatusRuntime - cause" | Status.UNIMPLEMENTED.withCause(new RuntimeException("some error")) | null + "Runtime - description" | Status.UNKNOWN.withDescription("some description") | "some description" + "Status - description" | Status.PERMISSION_DENIED.withDescription("some description") | "some description" + "StatusRuntime - description" | Status.UNIMPLEMENTED.withDescription("some description") | "some description" + } + + def "test error thrown - #name"() { + setup: + def error = status.asRuntimeException() + BindableService greeter = new GreeterGrpc.GreeterImplBase() { + @Override + void sayHello( + final Helloworld.Request req, final StreamObserver responseObserver) { + throw error + } + } + Server server = InProcessServerBuilder.forName(getClass().name).addService(greeter).directExecutor().build().start() + + ManagedChannel channel = InProcessChannelBuilder.forName(getClass().name).build() + GreeterGrpc.GreeterBlockingStub client = GreeterGrpc.newBlockingStub(channel) + + when: + client.sayHello(Helloworld.Request.newBuilder().setName(name).build()) + + then: + thrown StatusRuntimeException + + assertTraces(TEST_WRITER, 2) { + trace(0, 2) { + span(0) { + serviceName "unnamed-java-app" + operationName "grpc.server" + resourceName "example.Greeter/SayHello" + spanType DDSpanTypes.RPC + childOf trace(1).get(0) + errored true + tags { + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER + "$DDTags.SPAN_TYPE" DDSpanTypes.RPC + errorTags error.class, error.message + defaultTags() + } + } + span(1) { + serviceName "unnamed-java-app" + operationName "grpc.message" + resourceName "grpc.message" + spanType DDSpanTypes.RPC + childOf span(0) + errored false + tags { + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_SERVER + "$DDTags.SPAN_TYPE" DDSpanTypes.RPC + "message.type" "example.Helloworld\$Request" + defaultTags() + } + } + } + trace(1, 1) { + span(0) { + serviceName "unnamed-java-app" + operationName "grpc.client" + resourceName "example.Greeter/SayHello" + spanType DDSpanTypes.RPC + parent() + errored true + tags { + "status.code" "UNKNOWN" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "$DDTags.SPAN_TYPE" DDSpanTypes.RPC + tag "error", true + defaultTags() + } + } + } + } + + cleanup: + channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS) + server?.shutdownNow()?.awaitTermination() + + where: + name | status + "Runtime - cause" | Status.UNKNOWN.withCause(new RuntimeException("some error")) + "Status - cause" | Status.PERMISSION_DENIED.withCause(new RuntimeException("some error")) + "StatusRuntime - cause" | Status.UNIMPLEMENTED.withCause(new RuntimeException("some error")) + "Runtime - description" | Status.UNKNOWN.withDescription("some description") + "Status - description" | Status.PERMISSION_DENIED.withDescription("some description") + "StatusRuntime - description" | Status.UNIMPLEMENTED.withDescription("some description") + } +} diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/util/BlockingInterceptor.groovy b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/util/BlockingInterceptor.groovy new file mode 100644 index 0000000000..deaf39decf --- /dev/null +++ b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/util/BlockingInterceptor.groovy @@ -0,0 +1,33 @@ +package util + +import io.grpc.CallOptions +import io.grpc.Channel +import io.grpc.ClientCall +import io.grpc.ClientInterceptor +import io.grpc.ForwardingClientCall +import io.grpc.Metadata +import io.grpc.MethodDescriptor + +import java.util.concurrent.Phaser + +/** + * Interceptor that blocks client from returning until server trace is reported. + */ +class BlockingInterceptor implements ClientInterceptor { + private final Phaser phaser + + BlockingInterceptor(Phaser phaser) { + this.phaser = phaser + phaser.register() + } + + @Override + ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, Channel next) { + return new ForwardingClientCall.SimpleForwardingClientCall(next.newCall(method, callOptions)) { + @Override + void start(final ClientCall.Listener responseListener, final Metadata headers) { + super.start(new BlockingListener(responseListener, phaser), headers) + } + } + } +} diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/util/BlockingListener.groovy b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/util/BlockingListener.groovy new file mode 100644 index 0000000000..691f887b2c --- /dev/null +++ b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/util/BlockingListener.groovy @@ -0,0 +1,23 @@ +package util + +import io.grpc.ClientCall +import io.grpc.ForwardingClientCallListener +import io.grpc.Metadata +import io.grpc.Status + +import java.util.concurrent.Phaser + +class BlockingListener extends ForwardingClientCallListener.SimpleForwardingClientCallListener { + private final Phaser phaser + + BlockingListener(ClientCall.Listener delegate, Phaser phaser) { + super(delegate) + this.phaser = phaser + } + + @Override + void onClose(final Status status, final Metadata trailers) { + delegate().onClose(status, trailers) + phaser.arriveAndAwaitAdvance() + } +} diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/test/proto/helloworld.proto b/dd-java-agent/instrumentation/grpc-1.5/src/test/proto/helloworld.proto new file mode 100644 index 0000000000..412aec0111 --- /dev/null +++ b/dd-java-agent/instrumentation/grpc-1.5/src/test/proto/helloworld.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +package example; + +service Greeter { + rpc SayHello (Request) returns (Response) { + } + + rpc Conversation (stream Response) returns (stream Response) { + } +} + +message Request { + string name = 1; +} + +message Response { + string message = 1; +} diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/ListWriterAssert.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/ListWriterAssert.groovy index 0cd127873b..72a5bf969a 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/ListWriterAssert.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/ListWriterAssert.groovy @@ -1,5 +1,6 @@ package datadog.trace.agent.test +import datadog.opentracing.DDSpan import datadog.trace.common.writer.ListWriter import org.codehaus.groovy.runtime.powerassert.PowerAssertionError import org.spockframework.runtime.Condition @@ -13,7 +14,7 @@ class ListWriterAssert { private final int size private final Set assertedIndexes = new HashSet<>() - private ListWriterAssert(writer) { + private ListWriterAssert(ListWriter writer) { this.writer = writer size = writer.size() } @@ -49,6 +50,10 @@ class ListWriterAssert { } } + List trace(int index) { + return writer.get(index) + } + void trace(int index, int expectedSize, @DelegatesTo(value = TraceAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { if (index >= size) { diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/TagsAssert.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/TagsAssert.groovy index 3e0b5e93a1..fc8233c848 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/TagsAssert.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/TagsAssert.groovy @@ -44,6 +44,9 @@ class TagsAssert { } def tag(String name, value) { + if (value == null) { + return + } assertedTags.add(name) if (value instanceof Class) { assert ((Class) value).isInstance(tags[name]) diff --git a/dd-java-agent/testing/src/main/java/datadog/trace/agent/test/AgentTestRunner.java b/dd-java-agent/testing/src/main/java/datadog/trace/agent/test/AgentTestRunner.java index f5c417ca57..a24c4e8a5b 100644 --- a/dd-java-agent/testing/src/main/java/datadog/trace/agent/test/AgentTestRunner.java +++ b/dd-java-agent/testing/src/main/java/datadog/trace/agent/test/AgentTestRunner.java @@ -77,13 +77,12 @@ public abstract class AgentTestRunner extends Specification { ((Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME)).setLevel(Level.WARN); ((Logger) LoggerFactory.getLogger("datadog")).setLevel(Level.DEBUG); - WRITER_PHASER.register(); TEST_WRITER = new ListWriter() { @Override public boolean add(final List trace) { final boolean result = super.add(trace); - WRITER_PHASER.arrive(); + WRITER_PHASER.arriveAndDeregister(); return result; } }; @@ -137,6 +136,7 @@ public abstract class AgentTestRunner extends Specification { @Before public void beforeTest() { TEST_WRITER.start(); + WRITER_PHASER.register(); INSTRUMENTATION_ERROR_COUNT.set(0); ERROR_LISTENER.activateTest(this); assert getTestTracer().activeSpan() == null; @@ -159,11 +159,11 @@ public abstract class AgentTestRunner extends Specification { public static class ErrorCountingListener implements AgentBuilder.Listener { private static final List activeTests = new CopyOnWriteArrayList<>(); - public void activateTest(AgentTestRunner testRunner) { + public void activateTest(final AgentTestRunner testRunner) { activeTests.add(testRunner); } - public void deactivateTest(AgentTestRunner testRunner) { + public void deactivateTest(final AgentTestRunner testRunner) { activeTests.remove(testRunner); } @@ -198,7 +198,7 @@ public abstract class AgentTestRunner extends Specification { final JavaModule module, final boolean loaded, final Throwable throwable) { - for (AgentTestRunner testRunner : activeTests) { + for (final AgentTestRunner testRunner : activeTests) { if (testRunner.onInstrumentationError(typeName, classLoader, module, loaded, throwable)) { INSTRUMENTATION_ERROR_COUNT.incrementAndGet(); break; diff --git a/dd-trace-api/src/main/java/datadog/trace/api/DDSpanTypes.java b/dd-trace-api/src/main/java/datadog/trace/api/DDSpanTypes.java index ab20726232..0243134634 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/DDSpanTypes.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/DDSpanTypes.java @@ -3,6 +3,7 @@ package datadog.trace.api; public class DDSpanTypes { public static final String HTTP_CLIENT = "http"; public static final String WEB_SERVLET = "web"; + public static final String RPC = "rpc"; public static final String SQL = "sql"; public static final String MONGO = "mongodb"; diff --git a/gradle/java.gradle b/gradle/java.gradle index a56dbb4228..522a4fb34b 100644 --- a/gradle/java.gradle +++ b/gradle/java.gradle @@ -16,6 +16,7 @@ tasks.withType(JavaCompile) { options.compilerArgs += ['-Xep:FutureReturnValueIgnored:OFF'] // workaround for: https://github.com/google/error-prone/issues/780 options.compilerArgs += ['-Xep:ParameterName:OFF'] + options.compilerArgs += ['-XepDisableWarningsInGeneratedCode'] } apply plugin: "eclipse" diff --git a/settings.gradle b/settings.gradle index c4f843e258..c0cf81d488 100644 --- a/settings.gradle +++ b/settings.gradle @@ -18,6 +18,7 @@ include ':dd-java-agent:instrumentation:elasticsearch-rest-5' include ':dd-java-agent:instrumentation:elasticsearch-transport-2' include ':dd-java-agent:instrumentation:elasticsearch-transport-5' include ':dd-java-agent:instrumentation:elasticsearch-transport-6' +include ':dd-java-agent:instrumentation:grpc-1.5' include ':dd-java-agent:instrumentation:http-url-connection' include ':dd-java-agent:instrumentation:hystrix-1.4' include ':dd-java-agent:instrumentation:jax-rs-annotations'