Use ScopeManager to enable/disable low level async instrumentation.

This commit is contained in:
Andrew Kent 2018-04-02 16:59:57 -07:00
parent 0d7aa022db
commit a0a11a51d0
7 changed files with 62 additions and 13 deletions

View File

@ -1,4 +1,5 @@
import datadog.trace.api.Trace import datadog.trace.api.Trace
import datadog.trace.context.TraceScope
import akka.pattern.ask import akka.pattern.ask
import io.opentracing.util.GlobalTracer import io.opentracing.util.GlobalTracer
@ -32,18 +33,21 @@ class AkkaActors {
@Trace @Trace
def basicTell() : Unit = { def basicTell() : Unit = {
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true)
howdyGreeter ! WhoToGreet("Akka") howdyGreeter ! WhoToGreet("Akka")
howdyGreeter ! Greet howdyGreeter ! Greet
} }
@Trace @Trace
def basicAsk() : Unit = { def basicAsk() : Unit = {
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true)
howdyGreeter ! WhoToGreet("Akka") howdyGreeter ! WhoToGreet("Akka")
howdyGreeter ? Greet howdyGreeter ? Greet
} }
@Trace @Trace
def basicForward() : Unit = { def basicForward() : Unit = {
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true)
helloGreeter ! WhoToGreet("Akka") helloGreeter ! WhoToGreet("Akka")
helloGreeter ? Greet helloGreeter ? Greet
} }

View File

@ -1,4 +1,5 @@
import datadog.trace.api.Trace import datadog.trace.api.Trace
import datadog.trace.context.TraceScope
import io.opentracing.util.GlobalTracer import io.opentracing.util.GlobalTracer
import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.ExecutionContext.Implicits.global
@ -12,6 +13,7 @@ class ScalaConcurrentTests {
*/ */
@Trace @Trace
def traceWithFutureAndCallbacks() : Integer = { def traceWithFutureAndCallbacks() : Integer = {
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true)
val goodFuture: Future[Integer] = Future { val goodFuture: Future[Integer] = Future {
tracedChild("goodFuture") tracedChild("goodFuture")
1 1
@ -32,6 +34,7 @@ class ScalaConcurrentTests {
@Trace @Trace
def tracedAcrossThreadsWithNoTrace() :Integer = { def tracedAcrossThreadsWithNoTrace() :Integer = {
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true)
val goodFuture: Future[Integer] = Future { val goodFuture: Future[Integer] = Future {
1 1
} }
@ -51,6 +54,7 @@ class ScalaConcurrentTests {
*/ */
@Trace @Trace
def traceWithPromises() : Integer = { def traceWithPromises() : Integer = {
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true)
val keptPromise = Promise[Boolean]() val keptPromise = Promise[Boolean]()
val brokenPromise = Promise[Boolean]() val brokenPromise = Promise[Boolean]()
val afterPromise = keptPromise.future val afterPromise = keptPromise.future
@ -83,6 +87,7 @@ class ScalaConcurrentTests {
*/ */
@Trace @Trace
def tracedWithFutureFirstCompletions() :Integer = { def tracedWithFutureFirstCompletions() :Integer = {
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true)
val completedVal = Future.firstCompletedOf( val completedVal = Future.firstCompletedOf(
List( List(
Future { Future {
@ -106,6 +111,7 @@ class ScalaConcurrentTests {
*/ */
@Trace @Trace
def tracedTimeout(): Integer = { def tracedTimeout(): Integer = {
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true)
val f: Future[String] = Future { val f: Future[String] = Future {
tracedChild("timeoutChild") tracedChild("timeoutChild")
while(true) { while(true) {

View File

@ -140,7 +140,10 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
public static DatadogWrapper wrapJob( public static DatadogWrapper wrapJob(
@Advice.Argument(value = 0, readOnly = false) Runnable task) { @Advice.Argument(value = 0, readOnly = false) Runnable task) {
final Scope scope = GlobalTracer.get().scopeManager().active(); final Scope scope = GlobalTracer.get().scopeManager().active();
if (scope instanceof TraceScope && task != null && !(task instanceof DatadogWrapper)) { if (scope instanceof TraceScope
&& ((TraceScope) scope).isAsyncLinking()
&& task != null
&& !(task instanceof DatadogWrapper)) {
task = new RunnableWrapper(task, (TraceScope) scope); task = new RunnableWrapper(task, (TraceScope) scope);
return (RunnableWrapper) task; return (RunnableWrapper) task;
} }
@ -161,7 +164,10 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
public static DatadogWrapper wrapJob( public static DatadogWrapper wrapJob(
@Advice.Argument(value = 0, readOnly = false) Callable task) { @Advice.Argument(value = 0, readOnly = false) Callable task) {
final Scope scope = GlobalTracer.get().scopeManager().active(); final Scope scope = GlobalTracer.get().scopeManager().active();
if (scope instanceof TraceScope && task != null && !(task instanceof DatadogWrapper)) { if (scope instanceof TraceScope
&& ((TraceScope) scope).isAsyncLinking()
&& task != null
&& !(task instanceof DatadogWrapper)) {
task = new CallableWrapper(task, (TraceScope) scope); task = new CallableWrapper(task, (TraceScope) scope);
return (CallableWrapper) task; return (CallableWrapper) task;
} }
@ -182,7 +188,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
public static Collection<?> wrapJob( public static Collection<?> wrapJob(
@Advice.Argument(value = 0, readOnly = false) Collection<? extends Callable<?>> tasks) { @Advice.Argument(value = 0, readOnly = false) Collection<? extends Callable<?>> tasks) {
final Scope scope = GlobalTracer.get().scopeManager().active(); final Scope scope = GlobalTracer.get().scopeManager().active();
if (scope instanceof TraceScope) { if (scope instanceof TraceScope && ((TraceScope) scope).isAsyncLinking()) {
Collection<Callable<?>> wrappedTasks = new ArrayList<>(tasks.size()); Collection<Callable<?>> wrappedTasks = new ArrayList<>(tasks.size());
for (Callable task : tasks) { for (Callable task : tasks) {
if (task != null) { if (task != null) {
@ -241,6 +247,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
@Override @Override
public void run() { public void run() {
final TraceScope context = continuation.activate(); final TraceScope context = continuation.activate();
context.setAsyncLinking(true);
try { try {
delegatee.run(); delegatee.run();
} finally { } finally {
@ -261,6 +268,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
@Override @Override
public T call() throws Exception { public T call() throws Exception {
final TraceScope context = continuation.activate(); final TraceScope context = continuation.activate();
context.setAsyncLinking(true);
try { try {
return delegatee.call(); return delegatee.call();
} finally { } finally {

View File

@ -1,6 +1,8 @@
import datadog.opentracing.DDSpan import datadog.opentracing.DDSpan
import datadog.opentracing.scopemanager.ContinuableScope
import datadog.trace.agent.test.AgentTestRunner import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.api.Trace import datadog.trace.api.Trace
import io.opentracing.util.GlobalTracer
import spock.lang.Shared import spock.lang.Shared
import spock.lang.Unroll import spock.lang.Unroll
@ -48,6 +50,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
@Override @Override
@Trace(operationName = "parent") @Trace(operationName = "parent")
void run() { void run() {
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncLinking(true)
// this child will have a span // this child will have a span
m.invoke(pool, new AsyncChild()) m.invoke(pool, new AsyncChild())
// this child won't // this child won't
@ -92,6 +95,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
@Override @Override
@Trace(operationName = "parent") @Trace(operationName = "parent")
void run() { void run() {
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncLinking(true)
try { try {
for (int i = 0; i < 20; ++ i) { for (int i = 0; i < 20; ++ i) {
Future f = pool.submit((Callable)child) Future f = pool.submit((Callable)child)

View File

@ -9,6 +9,7 @@ import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.*; import datadog.trace.agent.tooling.*;
import datadog.trace.api.DDSpanTypes; import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags; import datadog.trace.api.DDTags;
import datadog.trace.context.TraceScope;
import io.opentracing.Scope; import io.opentracing.Scope;
import io.opentracing.Span; import io.opentracing.Span;
import io.opentracing.SpanContext; import io.opentracing.SpanContext;
@ -77,9 +78,7 @@ public final class PlayInstrumentation extends Instrumenter.Configurable {
public static class PlayAdvice { public static class PlayAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class) @Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope startSpan(@Advice.Argument(0) final Request req) { public static Scope startSpan(@Advice.Argument(0) final Request req) {
// TODO final Scope scope;
// begin tracking across threads
if (GlobalTracer.get().activeSpan() == null) { if (GlobalTracer.get().activeSpan() == null) {
final SpanContext extractedContext; final SpanContext extractedContext;
if (GlobalTracer.get().scopeManager().active() == null) { if (GlobalTracer.get().scopeManager().active() == null) {
@ -88,15 +87,21 @@ public final class PlayInstrumentation extends Instrumenter.Configurable {
} else { } else {
extractedContext = null; extractedContext = null;
} }
return GlobalTracer.get() scope =
GlobalTracer.get()
.buildSpan("play.request") .buildSpan("play.request")
.asChildOf(extractedContext) .asChildOf(extractedContext)
.startActive(false); .startActive(false);
} else { } else {
// An upstream framework (e.g. akka-http, netty) has already started the span. // An upstream framework (e.g. akka-http, netty) has already started the span.
// Do not extract the context. // Do not extract the context.
return GlobalTracer.get().buildSpan("play.request").startActive(false); scope = GlobalTracer.get().buildSpan("play.request").startActive(false);
} }
if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) {
((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncLinking(true);
}
return scope;
} }
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
@ -169,6 +174,9 @@ public final class PlayInstrumentation extends Instrumenter.Configurable {
@Override @Override
public Object apply(Throwable t, boolean isCheck) throws Exception { public Object apply(Throwable t, boolean isCheck) throws Exception {
try { try {
if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) {
((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncLinking(false);
}
onError(span, t); onError(span, t);
} catch (Throwable t2) { } catch (Throwable t2) {
LoggerFactory.getLogger(RequestCallback.class).debug("error in play instrumentation", t); LoggerFactory.getLogger(RequestCallback.class).debug("error in play instrumentation", t);
@ -193,8 +201,9 @@ public final class PlayInstrumentation extends Instrumenter.Configurable {
} }
public Result apply(Result result) { public Result apply(Result result) {
// TODO if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) {
// stop tracking across threads ((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncLinking(false);
}
try { try {
Tags.HTTP_STATUS.set(span, result.header().status()); Tags.HTTP_STATUS.set(span, result.header().status());
} catch (Throwable t) { } catch (Throwable t) {

View File

@ -13,6 +13,12 @@ public interface TraceScope {
/** Close the activated context and allow any underlying spans to finish. */ /** Close the activated context and allow any underlying spans to finish. */
void close(); void close();
/** If true, this context will propagate across async boundaries. */
boolean isAsyncLinking();
/** Set context's async propagation value. */
void setAsyncLinking(boolean value);
/** Used to pass async context between workers. */ /** Used to pass async context between workers. */
interface Continuation { interface Continuation {
/** /**

View File

@ -27,6 +27,8 @@ public class ContinuableScope implements Scope, TraceScope {
private final Scope toRestore; private final Scope toRestore;
/** Continuation that created this scope. May be null. */ /** Continuation that created this scope. May be null. */
private final Continuation continuation; private final Continuation continuation;
/** Flag to propagate this scope across async boundaries. */
private final AtomicBoolean isAsyncLinking = new AtomicBoolean(false);
ContinuableScope( ContinuableScope(
final ContextualScopeManager scopeManager, final ContextualScopeManager scopeManager,
@ -70,6 +72,16 @@ public class ContinuableScope implements Scope, TraceScope {
return spanUnderScope; return spanUnderScope;
} }
@Override
public void setAsyncLinking(boolean value) {
isAsyncLinking.set(value);
}
@Override
public boolean isAsyncLinking() {
return isAsyncLinking.get();
}
/** /**
* The continuation returned should be closed after the associa * The continuation returned should be closed after the associa
* *