disable async propagation for InProcess communication.

This commit is contained in:
Tyler Benson 2020-03-06 09:20:00 -08:00
parent cea0bc8a52
commit 2e86ca2141
2 changed files with 53 additions and 7 deletions

View File

@ -0,0 +1,53 @@
package datadog.trace.instrumentation.grpc.server;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.context.TraceScope;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@AutoService(Instrumenter.class)
public class InProcessServerStreamInstrumentation extends Instrumenter.Default {
public InProcessServerStreamInstrumentation() {
super("grpc", "grpc-server");
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("io.grpc.inprocess.InProcessTransport$InProcessStream$InProcessServerStream");
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(isPublic(), getClass().getName() + "$DisableAsyncPropagationAdvice");
}
public static class DisableAsyncPropagationAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static TraceScope enter() {
final TraceScope scope = activeScope();
if (scope != null && scope.isAsyncPropagating()) {
scope.setAsyncPropagation(false);
return scope;
}
return null;
}
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void exit(@Advice.Enter final TraceScope scopeToReenable) {
if (scopeToReenable != null) {
scopeToReenable.setAsyncPropagation(true);
}
}
}
}

View File

@ -10,7 +10,6 @@ import io.grpc.Server
import io.grpc.inprocess.InProcessChannelBuilder import io.grpc.inprocess.InProcessChannelBuilder
import io.grpc.inprocess.InProcessServerBuilder import io.grpc.inprocess.InProcessServerBuilder
import io.grpc.stub.StreamObserver import io.grpc.stub.StreamObserver
import io.opentracing.noop.NoopSpan
import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
@ -37,9 +36,7 @@ class GrpcStreamingTest extends AgentTestRunner {
(1..msgCount).each { (1..msgCount).each {
if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) { if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) {
// The InProcessTransport calls the client response in process, so we have to disable async propagation. // The InProcessTransport calls the client response in process, so we have to disable async propagation.
def tempScope = testTracer.scopeManager().activate(NoopSpan.INSTANCE)
observer.onNext(value) observer.onNext(value)
tempScope.close()
} else { } else {
observer.onError(new IllegalStateException("not async propagating!")) observer.onError(new IllegalStateException("not async propagating!"))
} }
@ -50,10 +47,8 @@ class GrpcStreamingTest extends AgentTestRunner {
void onError(Throwable t) { void onError(Throwable t) {
if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) { if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) {
// The InProcessTransport calls the client response in process, so we have to disable async propagation. // The InProcessTransport calls the client response in process, so we have to disable async propagation.
def tempScope = testTracer.scopeManager().activate(NoopSpan.INSTANCE)
error.set(t) error.set(t)
observer.onError(t) observer.onError(t)
tempScope.close()
} else { } else {
observer.onError(new IllegalStateException("not async propagating!")) observer.onError(new IllegalStateException("not async propagating!"))
} }
@ -63,9 +58,7 @@ class GrpcStreamingTest extends AgentTestRunner {
void onCompleted() { void onCompleted() {
if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) { if ((testTracer.scopeManager().active() as ContinuableScope).isAsyncPropagating()) {
// The InProcessTransport calls the client response in process, so we have to disable async propagation. // The InProcessTransport calls the client response in process, so we have to disable async propagation.
def tempScope = testTracer.scopeManager().activate(NoopSpan.INSTANCE)
observer.onCompleted() observer.onCompleted()
tempScope.close()
} else { } else {
observer.onError(new IllegalStateException("not async propagating!")) observer.onError(new IllegalStateException("not async propagating!"))
} }