From cea0bc8a5211c0096378d7cbd62648bd0d0a7fa5 Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Thu, 5 Mar 2020 15:57:26 -0800 Subject: [PATCH 1/4] Fix grpc tests with java-concurrent. --- .../instrumentation/grpc-1.5/grpc-1.5.gradle | 3 + .../grpc/server/TracingServerInterceptor.java | 4 +- .../src/test/groovy/GrpcStreamingTest.groovy | 57 +++++++++++++++---- .../grpc-1.5/src/test/groovy/GrpcTest.groovy | 33 ++++++++--- .../groovy/util/BlockingInterceptor.groovy | 33 ----------- .../test/groovy/util/BlockingListener.groovy | 23 -------- 6 files changed, 76 insertions(+), 77 deletions(-) delete mode 100644 dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/util/BlockingInterceptor.groovy delete mode 100644 dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/util/BlockingListener.groovy diff --git a/dd-java-agent/instrumentation/grpc-1.5/grpc-1.5.gradle b/dd-java-agent/instrumentation/grpc-1.5/grpc-1.5.gradle index 4141e1c519..4c79ae255b 100644 --- a/dd-java-agent/instrumentation/grpc-1.5/grpc-1.5.gradle +++ b/dd-java-agent/instrumentation/grpc-1.5/grpc-1.5.gradle @@ -47,6 +47,9 @@ dependencies { testCompile group: 'io.grpc', name: 'grpc-netty', version: grpcVersion testCompile group: 'io.grpc', name: 'grpc-protobuf', version: grpcVersion testCompile group: 'io.grpc', name: 'grpc-stub', version: grpcVersion + testCompile group: 'javax.annotation', name: 'javax.annotation-api', version: '1.3.2' + + testCompile project(':dd-java-agent:instrumentation:java-concurrent') latestDepTestCompile sourceSets.test.output // include the protobuf generated classes latestDepTestCompile group: 'io.grpc', name: 'grpc-netty', version: '+' diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/TracingServerInterceptor.java b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/TracingServerInterceptor.java index 57cb0e8cc7..24e0dbf614 100644 --- a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/TracingServerInterceptor.java +++ b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/TracingServerInterceptor.java @@ -74,9 +74,9 @@ public class TracingServerInterceptor implements ServerInterceptor { public void close(final Status status, final Metadata trailers) { DECORATE.onClose(span, status); try (final AgentScope scope = activateSpan(span, false)) { - scope.setAsyncPropagation(true); + // Using async propagate here breaks the tests which use InProcessTransport. + // It also seems logical to not need it at all, so removing it. delegate().close(status, trailers); - scope.setAsyncPropagation(false); } catch (final Throwable e) { DECORATE.onError(span, e); throw e; diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy index 1215eec134..f477bc58ab 100644 --- a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy +++ b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy @@ -1,3 +1,4 @@ +import datadog.opentracing.scopemanager.ContinuableScope import datadog.trace.agent.test.AgentTestRunner import datadog.trace.api.DDSpanTypes import datadog.trace.bootstrap.instrumentation.api.Tags @@ -9,6 +10,7 @@ import io.grpc.Server import io.grpc.inprocess.InProcessChannelBuilder import io.grpc.inprocess.InProcessServerBuilder import io.grpc.stub.StreamObserver +import io.opentracing.noop.NoopSpan import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.TimeUnit @@ -29,22 +31,44 @@ class GrpcStreamingTest extends AgentTestRunner { return new StreamObserver() { @Override void onNext(Helloworld.Response value) { + serverReceived << value.message (1..msgCount).each { - observer.onNext(value) + if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) { + // The InProcessTransport calls the client response in process, so we have to disable async propagation. + def tempScope = testTracer.scopeManager().activate(NoopSpan.INSTANCE) + observer.onNext(value) + tempScope.close() + } else { + observer.onError(new IllegalStateException("not async propagating!")) + } } } @Override void onError(Throwable t) { - error.set(t) - observer.onError(t) + if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) { + // The InProcessTransport calls the client response in process, so we have to disable async propagation. + def tempScope = testTracer.scopeManager().activate(NoopSpan.INSTANCE) + error.set(t) + observer.onError(t) + tempScope.close() + } else { + observer.onError(new IllegalStateException("not async propagating!")) + } } @Override void onCompleted() { - observer.onCompleted() + if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) { + // The InProcessTransport calls the client response in process, so we have to disable async propagation. + def tempScope = testTracer.scopeManager().activate(NoopSpan.INSTANCE) + observer.onCompleted() + tempScope.close() + } else { + observer.onError(new IllegalStateException("not async propagating!")) + } } } } @@ -58,17 +82,29 @@ class GrpcStreamingTest extends AgentTestRunner { def observer = client.conversation(new StreamObserver() { @Override void onNext(Helloworld.Response value) { - clientReceived << value.message + if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) { + clientReceived << value.message + } else { + error.set(new IllegalStateException("not async propagating!")) + } } @Override void onError(Throwable t) { - error.set(t) + if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) { + error.set(t) + } else { + error.set(new IllegalStateException("not async propagating!")) + } } @Override void onCompleted() { - TEST_WRITER.waitForTraces(1) + if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) { + TEST_WRITER.waitForTraces(1) + } else { + error.set(new IllegalStateException("not async propagating!")) + } } }) @@ -80,6 +116,10 @@ class GrpcStreamingTest extends AgentTestRunner { then: error.get() == null + TEST_WRITER.waitForTraces(2) + error.get() == null + serverReceived == clientRange.collect { "call $it" } + clientReceived == serverRange.collect { clientRange.collect { "call $it" } }.flatten().sort() assertTraces(2) { trace(0, clientMessageCount + 1) { @@ -148,9 +188,6 @@ class GrpcStreamingTest extends AgentTestRunner { } } - serverReceived == clientRange.collect { "call $it" } - clientReceived == serverRange.collect { clientRange.collect { "call $it" } }.flatten().sort() - cleanup: channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS) server?.shutdownNow()?.awaitTermination() diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcTest.groovy b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcTest.groovy index fc1dda5842..2ac0245f34 100644 --- a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcTest.groovy +++ b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcTest.groovy @@ -1,3 +1,4 @@ +import datadog.common.exec.CommonTaskExecutor import datadog.trace.agent.test.AgentTestRunner import datadog.trace.api.DDSpanTypes import datadog.trace.bootstrap.instrumentation.api.Tags @@ -16,6 +17,9 @@ import io.grpc.stub.StreamObserver import java.util.concurrent.TimeUnit +import static datadog.trace.agent.test.utils.TraceUtils.basicSpan +import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace + class GrpcTest extends AgentTestRunner { def "test request-response"() { @@ -25,8 +29,14 @@ class GrpcTest extends AgentTestRunner { void sayHello( final Helloworld.Request req, final StreamObserver responseObserver) { final Helloworld.Response reply = Helloworld.Response.newBuilder().setMessage("Hello $req.name").build() - responseObserver.onNext(reply) - responseObserver.onCompleted() + CommonTaskExecutor.INSTANCE.execute { + if (testTracer.activeSpan() == null) { + responseObserver.onError(new IllegalStateException("no active span")) + } else { + responseObserver.onNext(reply) + responseObserver.onCompleted() + } + } } } Server server = InProcessServerBuilder.forName(getClass().name).addService(greeter).directExecutor().build().start() @@ -35,7 +45,11 @@ class GrpcTest extends AgentTestRunner { GreeterGrpc.GreeterBlockingStub client = GreeterGrpc.newBlockingStub(channel) when: - def response = client.sayHello(Helloworld.Request.newBuilder().setName(name).build()) + def response = runUnderTrace("parent") { + def resp = client.sayHello(Helloworld.Request.newBuilder().setName(name).build()) + TEST_WRITER.waitForTraces(1) // Wait for the server span to be reported. + return resp + } then: response.message == "Hello $name" @@ -46,7 +60,7 @@ class GrpcTest extends AgentTestRunner { operationName "grpc.server" resourceName "example.Greeter/SayHello" spanType DDSpanTypes.RPC - childOf trace(1).get(0) + childOf trace(1).get(1) errored false tags { "$Tags.COMPONENT" "grpc-server" @@ -70,13 +84,14 @@ class GrpcTest extends AgentTestRunner { } } } - trace(1, 2) { - span(0) { + trace(1, 3) { + basicSpan(it, 0, "parent") + span(1) { serviceName "unnamed-java-app" operationName "grpc.client" resourceName "example.Greeter/SayHello" spanType DDSpanTypes.RPC - parent() + childOf span(0) errored false tags { "$Tags.COMPONENT" "grpc-client" @@ -85,12 +100,12 @@ class GrpcTest extends AgentTestRunner { defaultTags() } } - span(1) { + span(2) { serviceName "unnamed-java-app" operationName "grpc.message" resourceName "grpc.message" spanType DDSpanTypes.RPC - childOf span(0) + childOf span(1) errored false tags { "$Tags.COMPONENT" "grpc-client" diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/util/BlockingInterceptor.groovy b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/util/BlockingInterceptor.groovy deleted file mode 100644 index deaf39decf..0000000000 --- a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/util/BlockingInterceptor.groovy +++ /dev/null @@ -1,33 +0,0 @@ -package util - -import io.grpc.CallOptions -import io.grpc.Channel -import io.grpc.ClientCall -import io.grpc.ClientInterceptor -import io.grpc.ForwardingClientCall -import io.grpc.Metadata -import io.grpc.MethodDescriptor - -import java.util.concurrent.Phaser - -/** - * Interceptor that blocks client from returning until server trace is reported. - */ -class BlockingInterceptor implements ClientInterceptor { - private final Phaser phaser - - BlockingInterceptor(Phaser phaser) { - this.phaser = phaser - phaser.register() - } - - @Override - ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, Channel next) { - return new ForwardingClientCall.SimpleForwardingClientCall(next.newCall(method, callOptions)) { - @Override - void start(final ClientCall.Listener responseListener, final Metadata headers) { - super.start(new BlockingListener(responseListener, phaser), headers) - } - } - } -} diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/util/BlockingListener.groovy b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/util/BlockingListener.groovy deleted file mode 100644 index 691f887b2c..0000000000 --- a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/util/BlockingListener.groovy +++ /dev/null @@ -1,23 +0,0 @@ -package util - -import io.grpc.ClientCall -import io.grpc.ForwardingClientCallListener -import io.grpc.Metadata -import io.grpc.Status - -import java.util.concurrent.Phaser - -class BlockingListener extends ForwardingClientCallListener.SimpleForwardingClientCallListener { - private final Phaser phaser - - BlockingListener(ClientCall.Listener delegate, Phaser phaser) { - super(delegate) - this.phaser = phaser - } - - @Override - void onClose(final Status status, final Metadata trailers) { - delegate().onClose(status, trailers) - phaser.arriveAndAwaitAdvance() - } -} From 2e86ca21414c6bc7bd7f1b2e386c7ce829a49527 Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Fri, 6 Mar 2020 09:20:00 -0800 Subject: [PATCH 2/4] disable async propagation for InProcess communication. --- .../InProcessServerStreamInstrumentation.java | 53 +++++++++++++++++++ .../src/test/groovy/GrpcStreamingTest.groovy | 7 --- 2 files changed, 53 insertions(+), 7 deletions(-) create mode 100644 dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/InProcessServerStreamInstrumentation.java diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/InProcessServerStreamInstrumentation.java b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/InProcessServerStreamInstrumentation.java new file mode 100644 index 0000000000..439684e488 --- /dev/null +++ b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/InProcessServerStreamInstrumentation.java @@ -0,0 +1,53 @@ +package datadog.trace.instrumentation.grpc.server; + +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope; +import static java.util.Collections.singletonMap; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.context.TraceScope; +import java.util.Map; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +@AutoService(Instrumenter.class) +public class InProcessServerStreamInstrumentation extends Instrumenter.Default { + + public InProcessServerStreamInstrumentation() { + super("grpc", "grpc-server"); + } + + @Override + public ElementMatcher typeMatcher() { + return named("io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream"); + } + + @Override + public Map, String> transformers() { + return singletonMap(isPublic(), getClass().getName() + "$DisableAsyncPropagationAdvice"); + } + + public static class DisableAsyncPropagationAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static TraceScope enter() { + final TraceScope scope = activeScope(); + if (scope != null && scope.isAsyncPropagating()) { + scope.setAsyncPropagation(false); + return scope; + } + return null; + } + + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void exit(@Advice.Enter final TraceScope scopeToReenable) { + if (scopeToReenable != null) { + scopeToReenable.setAsyncPropagation(true); + } + } + } +} diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy index f477bc58ab..7c009b10b4 100644 --- a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy +++ b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy @@ -10,7 +10,6 @@ import io.grpc.Server import io.grpc.inprocess.InProcessChannelBuilder import io.grpc.inprocess.InProcessServerBuilder import io.grpc.stub.StreamObserver -import io.opentracing.noop.NoopSpan import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.TimeUnit @@ -37,9 +36,7 @@ class GrpcStreamingTest extends AgentTestRunner { (1..msgCount).each { if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) { // The InProcessTransport calls the client response in process, so we have to disable async propagation. - def tempScope = testTracer.scopeManager().activate(NoopSpan.INSTANCE) observer.onNext(value) - tempScope.close() } else { observer.onError(new IllegalStateException("not async propagating!")) } @@ -50,10 +47,8 @@ class GrpcStreamingTest extends AgentTestRunner { void onError(Throwable t) { if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) { // The InProcessTransport calls the client response in process, so we have to disable async propagation. - def tempScope = testTracer.scopeManager().activate(NoopSpan.INSTANCE) error.set(t) observer.onError(t) - tempScope.close() } else { observer.onError(new IllegalStateException("not async propagating!")) } @@ -63,9 +58,7 @@ class GrpcStreamingTest extends AgentTestRunner { void onCompleted() { if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) { // The InProcessTransport calls the client response in process, so we have to disable async propagation. - def tempScope = testTracer.scopeManager().activate(NoopSpan.INSTANCE) observer.onCompleted() - tempScope.close() } else { observer.onError(new IllegalStateException("not async propagating!")) } From 206fbb944cc584a78586becfe2f4e3d1a6c7629b Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Fri, 6 Mar 2020 09:24:50 -0800 Subject: [PATCH 3/4] Fix comments [skip ci] --- .../grpc/server/InProcessServerStreamInstrumentation.java | 4 ++++ .../grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy | 3 --- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/InProcessServerStreamInstrumentation.java b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/InProcessServerStreamInstrumentation.java index 439684e488..dba22ed9dd 100644 --- a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/InProcessServerStreamInstrumentation.java +++ b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/InProcessServerStreamInstrumentation.java @@ -14,6 +14,10 @@ import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; +/** + * The InProcessTransport calls the client response in process, so we have to disable async + * propagation to allow spans to complete and be reported properly. + */ @AutoService(Instrumenter.class) public class InProcessServerStreamInstrumentation extends Instrumenter.Default { diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy index 7c009b10b4..b852a1ba5b 100644 --- a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy +++ b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy @@ -35,7 +35,6 @@ class GrpcStreamingTest extends AgentTestRunner { (1..msgCount).each { if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) { - // The InProcessTransport calls the client response in process, so we have to disable async propagation. observer.onNext(value) } else { observer.onError(new IllegalStateException("not async propagating!")) @@ -46,7 +45,6 @@ class GrpcStreamingTest extends AgentTestRunner { @Override void onError(Throwable t) { if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) { - // The InProcessTransport calls the client response in process, so we have to disable async propagation. error.set(t) observer.onError(t) } else { @@ -57,7 +55,6 @@ class GrpcStreamingTest extends AgentTestRunner { @Override void onCompleted() { if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) { - // The InProcessTransport calls the client response in process, so we have to disable async propagation. observer.onCompleted() } else { observer.onError(new IllegalStateException("not async propagating!")) From 7c73546ba17647f480c41638f746d30503d7f900 Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Fri, 6 Mar 2020 09:56:19 -0800 Subject: [PATCH 4/4] One more change. This should fix the issues in latestDepTest. --- .../instrumentation/grpc/client/TracingClientInterceptor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/TracingClientInterceptor.java b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/TracingClientInterceptor.java index 5102dda936..0a61774c46 100644 --- a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/TracingClientInterceptor.java +++ b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/TracingClientInterceptor.java @@ -64,7 +64,7 @@ public class TracingClientInterceptor implements ClientInterceptor { propagate().inject(span, headers, SETTER); try (final AgentScope scope = activateSpan(span, false)) { - scope.setAsyncPropagation(true); + // Don't async propagate otherwise the span gets tied up with a timeout handler. super.start(new TracingClientCallListener<>(span, responseListener), headers); } catch (final Throwable e) { DECORATE.onError(span, e);