Don't create continuations when async propagation is off

This commit is contained in:
Andrew Kent 2018-04-05 18:36:28 -04:00
parent a0a11a51d0
commit d594d6e8db
8 changed files with 62 additions and 30 deletions

View File

@ -33,21 +33,21 @@ class AkkaActors {
@Trace
def basicTell() : Unit = {
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true)
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true)
howdyGreeter ! WhoToGreet("Akka")
howdyGreeter ! Greet
}
@Trace
def basicAsk() : Unit = {
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true)
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true)
howdyGreeter ! WhoToGreet("Akka")
howdyGreeter ? Greet
}
@Trace
def basicForward() : Unit = {
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true)
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true)
helloGreeter ! WhoToGreet("Akka")
helloGreeter ? Greet
}

View File

@ -13,7 +13,7 @@ class ScalaConcurrentTests {
*/
@Trace
def traceWithFutureAndCallbacks() : Integer = {
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true)
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true)
val goodFuture: Future[Integer] = Future {
tracedChild("goodFuture")
1
@ -34,7 +34,7 @@ class ScalaConcurrentTests {
@Trace
def tracedAcrossThreadsWithNoTrace() :Integer = {
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true)
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true)
val goodFuture: Future[Integer] = Future {
1
}
@ -54,7 +54,7 @@ class ScalaConcurrentTests {
*/
@Trace
def traceWithPromises() : Integer = {
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true)
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true)
val keptPromise = Promise[Boolean]()
val brokenPromise = Promise[Boolean]()
val afterPromise = keptPromise.future
@ -87,7 +87,7 @@ class ScalaConcurrentTests {
*/
@Trace
def tracedWithFutureFirstCompletions() :Integer = {
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true)
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true)
val completedVal = Future.firstCompletedOf(
List(
Future {
@ -111,7 +111,7 @@ class ScalaConcurrentTests {
*/
@Trace
def tracedTimeout(): Integer = {
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true)
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true)
val f: Future[String] = Future {
tracedChild("timeoutChild")
while(true) {

View File

@ -141,7 +141,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
@Advice.Argument(value = 0, readOnly = false) Runnable task) {
final Scope scope = GlobalTracer.get().scopeManager().active();
if (scope instanceof TraceScope
&& ((TraceScope) scope).isAsyncLinking()
&& ((TraceScope) scope).isAsyncPropagating()
&& task != null
&& !(task instanceof DatadogWrapper)) {
task = new RunnableWrapper(task, (TraceScope) scope);
@ -165,7 +165,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
@Advice.Argument(value = 0, readOnly = false) Callable task) {
final Scope scope = GlobalTracer.get().scopeManager().active();
if (scope instanceof TraceScope
&& ((TraceScope) scope).isAsyncLinking()
&& ((TraceScope) scope).isAsyncPropagating()
&& task != null
&& !(task instanceof DatadogWrapper)) {
task = new CallableWrapper(task, (TraceScope) scope);
@ -188,7 +188,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
public static Collection<?> wrapJob(
@Advice.Argument(value = 0, readOnly = false) Collection<? extends Callable<?>> tasks) {
final Scope scope = GlobalTracer.get().scopeManager().active();
if (scope instanceof TraceScope && ((TraceScope) scope).isAsyncLinking()) {
if (scope instanceof TraceScope && ((TraceScope) scope).isAsyncPropagating()) {
Collection<Callable<?>> wrappedTasks = new ArrayList<>(tasks.size());
for (Callable task : tasks) {
if (task != null) {
@ -247,7 +247,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
@Override
public void run() {
final TraceScope context = continuation.activate();
context.setAsyncLinking(true);
context.setAsyncPropagation(true);
try {
delegatee.run();
} finally {
@ -268,7 +268,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
@Override
public T call() throws Exception {
final TraceScope context = continuation.activate();
context.setAsyncLinking(true);
context.setAsyncPropagation(true);
try {
return delegatee.call();
} finally {

View File

@ -50,7 +50,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
@Override
@Trace(operationName = "parent")
void run() {
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncLinking(true)
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true)
// this child will have a span
m.invoke(pool, new AsyncChild())
// this child won't
@ -95,7 +95,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
@Override
@Trace(operationName = "parent")
void run() {
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncLinking(true)
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true)
try {
for (int i = 0; i < 20; ++ i) {
Future f = pool.submit((Callable)child)

View File

@ -99,7 +99,7 @@ public final class PlayInstrumentation extends Instrumenter.Configurable {
}
if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) {
((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncLinking(true);
((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true);
}
return scope;
}
@ -175,7 +175,7 @@ public final class PlayInstrumentation extends Instrumenter.Configurable {
public Object apply(Throwable t, boolean isCheck) throws Exception {
try {
if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) {
((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncLinking(false);
((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(false);
}
onError(span, t);
} catch (Throwable t2) {
@ -202,7 +202,7 @@ public final class PlayInstrumentation extends Instrumenter.Configurable {
public Result apply(Result result) {
if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) {
((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncLinking(false);
((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(false);
}
try {
Tags.HTTP_STATUS.set(span, result.header().status());

View File

@ -14,10 +14,14 @@ public interface TraceScope {
void close();
/** If true, this context will propagate across async boundaries. */
boolean isAsyncLinking();
boolean isAsyncPropagating();
/** Set context's async propagation value. */
void setAsyncLinking(boolean value);
/**
* Enable or disable async propagation. Async propagation is initially set to false.
*
* @param value The new propagation value. True == propagate. False == don't propagate.
*/
void setAsyncPropagation(boolean value);
/** Used to pass async context between workers. */
interface Continuation {

View File

@ -28,7 +28,7 @@ public class ContinuableScope implements Scope, TraceScope {
/** Continuation that created this scope. May be null. */
private final Continuation continuation;
/** Flag to propagate this scope across async boundaries. */
private final AtomicBoolean isAsyncLinking = new AtomicBoolean(false);
private final AtomicBoolean isAsyncPropagating = new AtomicBoolean(false);
ContinuableScope(
final ContextualScopeManager scopeManager,
@ -73,23 +73,26 @@ public class ContinuableScope implements Scope, TraceScope {
}
@Override
public void setAsyncLinking(boolean value) {
isAsyncLinking.set(value);
public boolean isAsyncPropagating() {
return isAsyncPropagating.get();
}
@Override
public boolean isAsyncLinking() {
return isAsyncLinking.get();
public void setAsyncPropagation(boolean value) {
isAsyncPropagating.set(value);
}
/**
* The continuation returned should be closed after the associa
* The continuation returned must be closed or activated or the trace will not finish.
*
* @param finishOnClose
* @return
* @return The new continuation, or null if this scope is not async propagating.
*/
public Continuation capture() {
if (isAsyncPropagating()) {
return new Continuation();
} else {
return null;
}
}
public class Continuation implements Closeable, TraceScope.Continuation {

View File

@ -92,10 +92,30 @@ class ScopeManagerTest extends Specification {
finishSpan << [true, false]
}
def "ContinuableScope only creates continuations when propagation is set"() {
setup:
def builder = tracer.buildSpan("test")
def scope = (ContinuableScope) builder.startActive(true)
def continuation = scope.capture()
expect:
continuation == null
when:
scope.setAsyncPropagation(true)
continuation = scope.capture()
then:
continuation != null
cleanup:
continuation.close()
}
def "ContinuableScope doesn't close if non-zero"() {
setup:
def builder = tracer.buildSpan("test")
def scope = (ContinuableScope) builder.startActive(true)
scope.setAsyncPropagation(true)
def continuation = scope.capture()
expect:
@ -149,6 +169,7 @@ class ScopeManagerTest extends Specification {
def builder = tracer.buildSpan("test")
def scope = (ContinuableScope) builder.startActive(false)
def span = scope.span()
scope.setAsyncPropagation(true)
def continuation = scope.capture()
scope.close()
span.finish()
@ -186,6 +207,7 @@ class ScopeManagerTest extends Specification {
def parentScope = tracer.buildSpan("parent").startActive(true)
def parentSpan = parentScope.span()
ContinuableScope childScope = (ContinuableScope) tracer.buildSpan("parent").startActive(true)
childScope.setAsyncPropagation(true)
def childSpan = childScope.span()
def continuation = childScope.capture()
@ -209,6 +231,7 @@ class ScopeManagerTest extends Specification {
when:
def newScope = continuation.activate()
newScope.setAsyncPropagation(true)
def newContinuation = newScope.capture()
then:
@ -236,6 +259,7 @@ class ScopeManagerTest extends Specification {
def builder = tracer.buildSpan("test")
def scope = (ContinuableScope) builder.startActive(false)
def span = scope.span()
scope.setAsyncPropagation(true)
def continuation = scope.capture()
scope.close()
span.finish()
@ -313,6 +337,7 @@ class ScopeManagerTest extends Specification {
def "ContinuableScope put in threadLocal after continuation activation"() {
setup:
ContinuableScope scope = (ContinuableScope) tracer.buildSpan("parent").startActive(true)
scope.setAsyncPropagation(true)
expect:
scopeManager.tlsScope.get() == scope