diff --git a/dd-java-agent/instrumentation/grpc-1.5/grpc-1.5.gradle b/dd-java-agent/instrumentation/grpc-1.5/grpc-1.5.gradle index 4141e1c519..4c79ae255b 100644 --- a/dd-java-agent/instrumentation/grpc-1.5/grpc-1.5.gradle +++ b/dd-java-agent/instrumentation/grpc-1.5/grpc-1.5.gradle @@ -47,6 +47,9 @@ dependencies { testCompile group: 'io.grpc', name: 'grpc-netty', version: grpcVersion testCompile group: 'io.grpc', name: 'grpc-protobuf', version: grpcVersion testCompile group: 'io.grpc', name: 'grpc-stub', version: grpcVersion + testCompile group: 'javax.annotation', name: 'javax.annotation-api', version: '1.3.2' + + testCompile project(':dd-java-agent:instrumentation:java-concurrent') latestDepTestCompile sourceSets.test.output // include the protobuf generated classes latestDepTestCompile group: 'io.grpc', name: 'grpc-netty', version: '+' diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/TracingClientInterceptor.java b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/TracingClientInterceptor.java index 5102dda936..0a61774c46 100644 --- a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/TracingClientInterceptor.java +++ b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/TracingClientInterceptor.java @@ -64,7 +64,7 @@ public class TracingClientInterceptor implements ClientInterceptor { propagate().inject(span, headers, SETTER); try (final AgentScope scope = activateSpan(span, false)) { - scope.setAsyncPropagation(true); + // Don't async propagate otherwise the span gets tied up with a timeout handler. super.start(new TracingClientCallListener<>(span, responseListener), headers); } catch (final Throwable e) { DECORATE.onError(span, e); diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/InProcessServerStreamInstrumentation.java b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/InProcessServerStreamInstrumentation.java new file mode 100644 index 0000000000..dba22ed9dd --- /dev/null +++ b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/InProcessServerStreamInstrumentation.java @@ -0,0 +1,57 @@ +package datadog.trace.instrumentation.grpc.server; + +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope; +import static java.util.Collections.singletonMap; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.context.TraceScope; +import java.util.Map; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +/** + * The InProcessTransport calls the client response in process, so we have to disable async + * propagation to allow spans to complete and be reported properly. + */ +@AutoService(Instrumenter.class) +public class InProcessServerStreamInstrumentation extends Instrumenter.Default { + + public InProcessServerStreamInstrumentation() { + super("grpc", "grpc-server"); + } + + @Override + public ElementMatcher typeMatcher() { + return named("io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream"); + } + + @Override + public Map, String> transformers() { + return singletonMap(isPublic(), getClass().getName() + "$DisableAsyncPropagationAdvice"); + } + + public static class DisableAsyncPropagationAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static TraceScope enter() { + final TraceScope scope = activeScope(); + if (scope != null && scope.isAsyncPropagating()) { + scope.setAsyncPropagation(false); + return scope; + } + return null; + } + + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void exit(@Advice.Enter final TraceScope scopeToReenable) { + if (scopeToReenable != null) { + scopeToReenable.setAsyncPropagation(true); + } + } + } +} diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/TracingServerInterceptor.java b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/TracingServerInterceptor.java index 57cb0e8cc7..24e0dbf614 100644 --- a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/TracingServerInterceptor.java +++ b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/TracingServerInterceptor.java @@ -74,9 +74,9 @@ public class TracingServerInterceptor implements ServerInterceptor { public void close(final Status status, final Metadata trailers) { DECORATE.onClose(span, status); try (final AgentScope scope = activateSpan(span, false)) { - scope.setAsyncPropagation(true); + // Using async propagate here breaks the tests which use InProcessTransport. + // It also seems logical to not need it at all, so removing it. delegate().close(status, trailers); - scope.setAsyncPropagation(false); } catch (final Throwable e) { DECORATE.onError(span, e); throw e; diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy index 1215eec134..b852a1ba5b 100644 --- a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy +++ b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy @@ -1,3 +1,4 @@ +import datadog.opentracing.scopemanager.ContinuableScope import datadog.trace.agent.test.AgentTestRunner import datadog.trace.api.DDSpanTypes import datadog.trace.bootstrap.instrumentation.api.Tags @@ -29,22 +30,35 @@ class GrpcStreamingTest extends AgentTestRunner { return new StreamObserver() { @Override void onNext(Helloworld.Response value) { + serverReceived << value.message (1..msgCount).each { - observer.onNext(value) + if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) { + observer.onNext(value) + } else { + observer.onError(new IllegalStateException("not async propagating!")) + } } } @Override void onError(Throwable t) { - error.set(t) - observer.onError(t) + if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) { + error.set(t) + observer.onError(t) + } else { + observer.onError(new IllegalStateException("not async propagating!")) + } } @Override void onCompleted() { - observer.onCompleted() + if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) { + observer.onCompleted() + } else { + observer.onError(new IllegalStateException("not async propagating!")) + } } } } @@ -58,17 +72,29 @@ class GrpcStreamingTest extends AgentTestRunner { def observer = client.conversation(new StreamObserver() { @Override void onNext(Helloworld.Response value) { - clientReceived << value.message + if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) { + clientReceived << value.message + } else { + error.set(new IllegalStateException("not async propagating!")) + } } @Override void onError(Throwable t) { - error.set(t) + if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) { + error.set(t) + } else { + error.set(new IllegalStateException("not async propagating!")) + } } @Override void onCompleted() { - TEST_WRITER.waitForTraces(1) + if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) { + TEST_WRITER.waitForTraces(1) + } else { + error.set(new IllegalStateException("not async propagating!")) + } } }) @@ -80,6 +106,10 @@ class GrpcStreamingTest extends AgentTestRunner { then: error.get() == null + TEST_WRITER.waitForTraces(2) + error.get() == null + serverReceived == clientRange.collect { "call $it" } + clientReceived == serverRange.collect { clientRange.collect { "call $it" } }.flatten().sort() assertTraces(2) { trace(0, clientMessageCount + 1) { @@ -148,9 +178,6 @@ class GrpcStreamingTest extends AgentTestRunner { } } - serverReceived == clientRange.collect { "call $it" } - clientReceived == serverRange.collect { clientRange.collect { "call $it" } }.flatten().sort() - cleanup: channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS) server?.shutdownNow()?.awaitTermination() diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcTest.groovy b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcTest.groovy index fc1dda5842..2ac0245f34 100644 --- a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcTest.groovy +++ b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcTest.groovy @@ -1,3 +1,4 @@ +import datadog.common.exec.CommonTaskExecutor import datadog.trace.agent.test.AgentTestRunner import datadog.trace.api.DDSpanTypes import datadog.trace.bootstrap.instrumentation.api.Tags @@ -16,6 +17,9 @@ import io.grpc.stub.StreamObserver import java.util.concurrent.TimeUnit +import static datadog.trace.agent.test.utils.TraceUtils.basicSpan +import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace + class GrpcTest extends AgentTestRunner { def "test request-response"() { @@ -25,8 +29,14 @@ class GrpcTest extends AgentTestRunner { void sayHello( final Helloworld.Request req, final StreamObserver responseObserver) { final Helloworld.Response reply = Helloworld.Response.newBuilder().setMessage("Hello $req.name").build() - responseObserver.onNext(reply) - responseObserver.onCompleted() + CommonTaskExecutor.INSTANCE.execute { + if (testTracer.activeSpan() == null) { + responseObserver.onError(new IllegalStateException("no active span")) + } else { + responseObserver.onNext(reply) + responseObserver.onCompleted() + } + } } } Server server = InProcessServerBuilder.forName(getClass().name).addService(greeter).directExecutor().build().start() @@ -35,7 +45,11 @@ class GrpcTest extends AgentTestRunner { GreeterGrpc.GreeterBlockingStub client = GreeterGrpc.newBlockingStub(channel) when: - def response = client.sayHello(Helloworld.Request.newBuilder().setName(name).build()) + def response = runUnderTrace("parent") { + def resp = client.sayHello(Helloworld.Request.newBuilder().setName(name).build()) + TEST_WRITER.waitForTraces(1) // Wait for the server span to be reported. + return resp + } then: response.message == "Hello $name" @@ -46,7 +60,7 @@ class GrpcTest extends AgentTestRunner { operationName "grpc.server" resourceName "example.Greeter/SayHello" spanType DDSpanTypes.RPC - childOf trace(1).get(0) + childOf trace(1).get(1) errored false tags { "$Tags.COMPONENT" "grpc-server" @@ -70,13 +84,14 @@ class GrpcTest extends AgentTestRunner { } } } - trace(1, 2) { - span(0) { + trace(1, 3) { + basicSpan(it, 0, "parent") + span(1) { serviceName "unnamed-java-app" operationName "grpc.client" resourceName "example.Greeter/SayHello" spanType DDSpanTypes.RPC - parent() + childOf span(0) errored false tags { "$Tags.COMPONENT" "grpc-client" @@ -85,12 +100,12 @@ class GrpcTest extends AgentTestRunner { defaultTags() } } - span(1) { + span(2) { serviceName "unnamed-java-app" operationName "grpc.message" resourceName "grpc.message" spanType DDSpanTypes.RPC - childOf span(0) + childOf span(1) errored false tags { "$Tags.COMPONENT" "grpc-client" diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/util/BlockingInterceptor.groovy b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/util/BlockingInterceptor.groovy deleted file mode 100644 index deaf39decf..0000000000 --- a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/util/BlockingInterceptor.groovy +++ /dev/null @@ -1,33 +0,0 @@ -package util - -import io.grpc.CallOptions -import io.grpc.Channel -import io.grpc.ClientCall -import io.grpc.ClientInterceptor -import io.grpc.ForwardingClientCall -import io.grpc.Metadata -import io.grpc.MethodDescriptor - -import java.util.concurrent.Phaser - -/** - * Interceptor that blocks client from returning until server trace is reported. - */ -class BlockingInterceptor implements ClientInterceptor { - private final Phaser phaser - - BlockingInterceptor(Phaser phaser) { - this.phaser = phaser - phaser.register() - } - - @Override - ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, Channel next) { - return new ForwardingClientCall.SimpleForwardingClientCall(next.newCall(method, callOptions)) { - @Override - void start(final ClientCall.Listener responseListener, final Metadata headers) { - super.start(new BlockingListener(responseListener, phaser), headers) - } - } - } -} diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/util/BlockingListener.groovy b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/util/BlockingListener.groovy deleted file mode 100644 index 691f887b2c..0000000000 --- a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/util/BlockingListener.groovy +++ /dev/null @@ -1,23 +0,0 @@ -package util - -import io.grpc.ClientCall -import io.grpc.ForwardingClientCallListener -import io.grpc.Metadata -import io.grpc.Status - -import java.util.concurrent.Phaser - -class BlockingListener extends ForwardingClientCallListener.SimpleForwardingClientCallListener { - private final Phaser phaser - - BlockingListener(ClientCall.Listener delegate, Phaser phaser) { - super(delegate) - this.phaser = phaser - } - - @Override - void onClose(final Status status, final Metadata trailers) { - delegate().onClose(status, trailers) - phaser.arriveAndAwaitAdvance() - } -}