diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/InProcessServerStreamInstrumentation.java b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/InProcessServerStreamInstrumentation.java new file mode 100644 index 0000000000..439684e488 --- /dev/null +++ b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/InProcessServerStreamInstrumentation.java @@ -0,0 +1,53 @@ +package datadog.trace.instrumentation.grpc.server; + +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope; +import static java.util.Collections.singletonMap; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.context.TraceScope; +import java.util.Map; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +@AutoService(Instrumenter.class) +public class InProcessServerStreamInstrumentation extends Instrumenter.Default { + + public InProcessServerStreamInstrumentation() { + super("grpc", "grpc-server"); + } + + @Override + public ElementMatcher typeMatcher() { + return named("io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream"); + } + + @Override + public Map, String> transformers() { + return singletonMap(isPublic(), getClass().getName() + "$DisableAsyncPropagationAdvice"); + } + + public static class DisableAsyncPropagationAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static TraceScope enter() { + final TraceScope scope = activeScope(); + if (scope != null && scope.isAsyncPropagating()) { + scope.setAsyncPropagation(false); + return scope; + } + return null; + } + + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void exit(@Advice.Enter final TraceScope scopeToReenable) { + if (scopeToReenable != null) { + scopeToReenable.setAsyncPropagation(true); + } + } + } +} diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy index f477bc58ab..7c009b10b4 100644 --- a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy +++ b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy @@ -10,7 +10,6 @@ import io.grpc.Server import io.grpc.inprocess.InProcessChannelBuilder import io.grpc.inprocess.InProcessServerBuilder import io.grpc.stub.StreamObserver -import io.opentracing.noop.NoopSpan import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.TimeUnit @@ -37,9 +36,7 @@ class GrpcStreamingTest extends AgentTestRunner { (1..msgCount).each { if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) { // The InProcessTransport calls the client response in process, so we have to disable async propagation. - def tempScope = testTracer.scopeManager().activate(NoopSpan.INSTANCE) observer.onNext(value) - tempScope.close() } else { observer.onError(new IllegalStateException("not async propagating!")) } @@ -50,10 +47,8 @@ class GrpcStreamingTest extends AgentTestRunner { void onError(Throwable t) { if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) { // The InProcessTransport calls the client response in process, so we have to disable async propagation. - def tempScope = testTracer.scopeManager().activate(NoopSpan.INSTANCE) error.set(t) observer.onError(t) - tempScope.close() } else { observer.onError(new IllegalStateException("not async propagating!")) } @@ -63,9 +58,7 @@ class GrpcStreamingTest extends AgentTestRunner { void onCompleted() { if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) { // The InProcessTransport calls the client response in process, so we have to disable async propagation. - def tempScope = testTracer.scopeManager().activate(NoopSpan.INSTANCE) observer.onCompleted() - tempScope.close() } else { observer.onError(new IllegalStateException("not async propagating!")) }