From 0d7aa022db8810a7ed881de965204da3bc805744 Mon Sep 17 00:00:00 2001 From: Andrew Kent Date: Mon, 2 Apr 2018 11:31:50 -0700 Subject: [PATCH] Refactor ContinuableScope --- .../concurrent/ExecutorInstrumentation.java | 4 +- .../datadog/trace/context/TraceScope.java | 5 +- .../scopemanager/ContextualScopeManager.java | 7 +- .../scopemanager/ContinuableScope.java | 125 +++++++----------- .../opentracing/scopemanager/SimpleScope.java | 39 ++++++ .../scopemanager/ScopeManagerTest.groovy | 62 +++------ 6 files changed, 116 insertions(+), 126 deletions(-) create mode 100644 dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/SimpleScope.java diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java index 84a06e5b5e..5ccea111f4 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java @@ -217,13 +217,13 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable { protected final TraceScope.Continuation continuation; public DatadogWrapper(TraceScope scope) { - continuation = scope.capture(true); + continuation = scope.capture(); log.debug("created continuation {} from scope {}", continuation, scope); } public void cancel() { if (null != continuation) { - continuation.activate().close(); + continuation.close(); log.debug("canceled continuation {}", continuation); } } diff --git a/dd-trace-api/src/main/java/datadog/trace/context/TraceScope.java b/dd-trace-api/src/main/java/datadog/trace/context/TraceScope.java index 1886796edc..c859d14d2d 100644 --- a/dd-trace-api/src/main/java/datadog/trace/context/TraceScope.java +++ b/dd-trace-api/src/main/java/datadog/trace/context/TraceScope.java @@ -8,7 +8,7 @@ public interface TraceScope { * *

Should be called on the parent thread. */ - Continuation capture(boolean finishOnClose); + Continuation capture(); /** Close the activated context and allow any underlying spans to finish. */ void close(); @@ -21,5 +21,8 @@ public interface TraceScope { *

Should be called on the child thread. */ TraceScope activate(); + + /** Cancel the continuation. */ + void close(); } } diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContextualScopeManager.java b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContextualScopeManager.java index 094e9f869e..e0283a21c8 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContextualScopeManager.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContextualScopeManager.java @@ -1,5 +1,6 @@ package datadog.opentracing.scopemanager; +import datadog.opentracing.DDSpan; import io.opentracing.Scope; import io.opentracing.ScopeManager; import io.opentracing.Span; @@ -17,7 +18,11 @@ public class ContextualScopeManager implements ScopeManager { return context.activate(span, finishOnClose); } } - return new ContinuableScope(this, span, finishOnClose); + if (span instanceof DDSpan) { + return new ContinuableScope(this, (DDSpan) span, finishOnClose); + } else { + return new SimpleScope(this, span, finishOnClose); + } } @Override diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java index 3250894705..943f2b1779 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java @@ -1,11 +1,10 @@ package datadog.opentracing.scopemanager; +import datadog.opentracing.DDSpan; import datadog.opentracing.DDSpanContext; import datadog.opentracing.PendingTrace; import datadog.trace.context.TraceScope; import io.opentracing.Scope; -import io.opentracing.Span; -import io.opentracing.noop.NoopScopeManager; import java.io.Closeable; import java.lang.ref.WeakReference; import java.util.concurrent.atomic.AtomicBoolean; @@ -14,26 +13,38 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class ContinuableScope implements Scope, TraceScope { - final ContextualScopeManager scopeManager; - final AtomicInteger refCount; - private final Span wrapped; + /** ScopeManager holding the thread-local to this scope. */ + private final ContextualScopeManager scopeManager; + /** + * Span contained by this scope. Async scopes will hold a reference to the parent scope's span. + */ + private final DDSpan spanUnderScope; + /** If true, finish the span when openCount hits 0. */ private final boolean finishOnClose; + /** Count of open scope and continuations */ + private final AtomicInteger openCount; + /** Scope to placed in the thread local after close. May be null. */ private final Scope toRestore; + /** Continuation that created this scope. May be null. */ + private final Continuation continuation; ContinuableScope( - final ContextualScopeManager scopeManager, final Span wrapped, final boolean finishOnClose) { - this(scopeManager, new AtomicInteger(1), wrapped, finishOnClose); + final ContextualScopeManager scopeManager, + final DDSpan spanUnderScope, + final boolean finishOnClose) { + this(scopeManager, new AtomicInteger(1), null, spanUnderScope, finishOnClose); } private ContinuableScope( final ContextualScopeManager scopeManager, - final AtomicInteger refCount, - final Span wrapped, + final AtomicInteger openCount, + final Continuation continuation, + final DDSpan spanUnderScope, final boolean finishOnClose) { - this.scopeManager = scopeManager; - this.refCount = refCount; - this.wrapped = wrapped; + this.openCount = openCount; + this.continuation = continuation; + this.spanUnderScope = spanUnderScope; this.finishOnClose = finishOnClose; this.toRestore = scopeManager.tlsScope.get(); scopeManager.tlsScope.set(this); @@ -41,20 +52,22 @@ public class ContinuableScope implements Scope, TraceScope { @Override public void close() { - if (scopeManager.tlsScope.get() != this) { - return; + if (null != continuation) { + spanUnderScope.context().getTrace().cancelContinuation(continuation); } - if (refCount.decrementAndGet() == 0 && finishOnClose) { - wrapped.finish(); + if (openCount.decrementAndGet() == 0 && finishOnClose) { + spanUnderScope.finish(); } - scopeManager.tlsScope.set(toRestore); + if (scopeManager.tlsScope.get() == this) { + scopeManager.tlsScope.set(toRestore); + } } @Override - public Span span() { - return wrapped; + public DDSpan span() { + return spanUnderScope; } /** @@ -63,8 +76,8 @@ public class ContinuableScope implements Scope, TraceScope { * @param finishOnClose * @return */ - public Continuation capture(final boolean finishOnClose) { - return new Continuation(this.finishOnClose && finishOnClose); + public Continuation capture() { + return new Continuation(); } public class Continuation implements Closeable, TraceScope.Continuation { @@ -72,72 +85,32 @@ public class ContinuableScope implements Scope, TraceScope { private final AtomicBoolean used = new AtomicBoolean(false); private final PendingTrace trace; - private final boolean finishSpanOnClose; - private Continuation(final boolean finishOnClose) { - this.finishSpanOnClose = finishOnClose; - refCount.incrementAndGet(); - if (wrapped.context() instanceof DDSpanContext) { - final DDSpanContext context = (DDSpanContext) wrapped.context(); - trace = context.getTrace(); - trace.registerContinuation(this); - } else { - trace = null; - } + private Continuation() { + openCount.incrementAndGet(); + final DDSpanContext context = (DDSpanContext) spanUnderScope.context(); + trace = context.getTrace(); + trace.registerContinuation(this); } - public ClosingScope activate() { + public ContinuableScope activate() { if (used.compareAndSet(false, true)) { - for (final ScopeContext context : scopeManager.scopeContexts) { - if (context.inContext()) { - return new ClosingScope(context.activate(wrapped, finishSpanOnClose)); - } - } - return new ClosingScope( - new ContinuableScope(scopeManager, refCount, wrapped, finishSpanOnClose)); + return new ContinuableScope(scopeManager, openCount, this, spanUnderScope, finishOnClose); } else { - log.debug("Reusing a continuation not allowed. Returning no-op scope."); - return new ClosingScope(NoopScopeManager.NoopScope.INSTANCE); + log.debug( + "Failed to activate continuation. Reusing a continuation not allowed. Returning a new scope. Spans will not be linked."); + return new ContinuableScope( + scopeManager, new AtomicInteger(1), null, spanUnderScope, finishOnClose); } } @Override public void close() { - used.getAndSet(true); - if (trace != null) { + if (used.compareAndSet(false, true)) { trace.cancelContinuation(this); - } - } - - private class ClosingScope implements Scope, TraceScope { - private final Scope wrappedScope; - - private ClosingScope(final Scope wrappedScope) { - this.wrappedScope = wrappedScope; - } - - @Override - public Continuation capture(boolean finishOnClose) { - if (wrappedScope instanceof TraceScope) { - return ((TraceScope) wrappedScope).capture(finishOnClose); - } else { - log.debug( - "{} Failed to capture. ClosingScope does not wrap a TraceScope: {}.", - this, - wrappedScope); - return null; - } - } - - @Override - public void close() { - wrappedScope.close(); - ContinuableScope.Continuation.this.close(); - } - - @Override - public Span span() { - return wrappedScope.span(); + ContinuableScope.this.close(); + } else { + log.debug("Failed to close continuation {}. Already used.", this); } } } diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/SimpleScope.java b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/SimpleScope.java new file mode 100644 index 0000000000..a8a0511c6b --- /dev/null +++ b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/SimpleScope.java @@ -0,0 +1,39 @@ +package datadog.opentracing.scopemanager; + +import io.opentracing.Scope; +import io.opentracing.Span; + +/** Simple scope implementation which does not propagate across threads. */ +public class SimpleScope implements Scope { + private final ContextualScopeManager scopeManager; + private final Span spanUnderScope; + private final boolean finishOnClose; + private final Scope toRestore; + + public SimpleScope( + final ContextualScopeManager scopeManager, + final Span spanUnderScope, + final boolean finishOnClose) { + this.scopeManager = scopeManager; + this.spanUnderScope = spanUnderScope; + this.finishOnClose = finishOnClose; + this.toRestore = scopeManager.tlsScope.get(); + scopeManager.tlsScope.set(this); + } + + @Override + public void close() { + if (finishOnClose) { + spanUnderScope.finish(); + } + + if (scopeManager.tlsScope.get() == this) { + scopeManager.tlsScope.set(toRestore); + } + } + + @Override + public Span span() { + return spanUnderScope; + } +} diff --git a/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy b/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy index d99a370eac..f03e6e0e88 100644 --- a/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy @@ -92,11 +92,11 @@ class ScopeManagerTest extends Specification { finishSpan << [true, false] } - def "ref counting scope doesn't close if non-zero"() { + def "ContinuableScope doesn't close if non-zero"() { setup: def builder = tracer.buildSpan("test") def scope = (ContinuableScope) builder.startActive(true) - def continuation = scope.capture(true) + def continuation = scope.capture() expect: !spanFinished(scope.span()) @@ -149,7 +149,7 @@ class ScopeManagerTest extends Specification { def builder = tracer.buildSpan("test") def scope = (ContinuableScope) builder.startActive(false) def span = scope.span() - def continuation = scope.capture(true) + def continuation = scope.capture() scope.close() span.finish() @@ -188,7 +188,7 @@ class ScopeManagerTest extends Specification { ContinuableScope childScope = (ContinuableScope) tracer.buildSpan("parent").startActive(true) def childSpan = childScope.span() - def continuation = childScope.capture(true) + def continuation = childScope.capture() childScope.close() expect: @@ -209,11 +209,11 @@ class ScopeManagerTest extends Specification { when: def newScope = continuation.activate() - def newContinuation = newScope.capture(true) + def newContinuation = newScope.capture() then: - newScope instanceof ContinuableScope.Continuation.ClosingScope - scopeManager.active() == newScope.wrappedScope + newScope instanceof ContinuableScope + scopeManager.active() == newScope newScope != childScope && newScope != parentScope newScope.span() == childSpan !spanFinished(childSpan) @@ -236,16 +236,16 @@ class ScopeManagerTest extends Specification { def builder = tracer.buildSpan("test") def scope = (ContinuableScope) builder.startActive(false) def span = scope.span() - def continuation = scope.capture(false) + def continuation = scope.capture() scope.close() span.finish() def newScope = continuation.activate() expect: - newScope instanceof ContinuableScope.Continuation.ClosingScope + newScope instanceof ContinuableScope newScope != scope - scopeManager.active() == newScope.wrappedScope + scopeManager.active() == newScope spanFinished(span) writer == [] @@ -259,24 +259,7 @@ class ScopeManagerTest extends Specification { scopeManager.active() == null spanFinished(childSpan) childSpan.context().parentId == span.context().spanId - writer == [] - - when: - if (closeScope) { - newScope.close() - } - if (closeContinuation) { - continuation.close() - } - - then: writer == [[childSpan, span]] - - where: - closeScope | closeContinuation - true | false - false | true - true | true } @Unroll @@ -327,40 +310,27 @@ class ScopeManagerTest extends Specification { [new AtomicReferenceScope(false), new AtomicReferenceScope(false), new AtomicReferenceScope(false)] | _ } - @Unroll - def "threadlocal to context with capture (#active)"() { + def "ContinuableScope put in threadLocal after continuation activation"() { setup: - contexts.each { - scopeManager.addScopeContext(it) - } ContinuableScope scope = (ContinuableScope) tracer.buildSpan("parent").startActive(true) expect: scopeManager.tlsScope.get() == scope when: - def cont = scope.capture(true) + def cont = scope.capture() scope.close() then: scopeManager.tlsScope.get() == null when: - active.each { - ((AtomicBoolean) contexts[it].enabled).set(true) - } - cont.activate() + scopeManager.addScopeContext(new AtomicReferenceScope(true)) + def newScope = cont.activate() then: - scopeManager.tlsScope.get() == null - - where: - active | contexts - [0] | [new AtomicReferenceScope(false)] - [0] | [new AtomicReferenceScope(false), new AtomicReferenceScope(false)] - [1] | [new AtomicReferenceScope(false), new AtomicReferenceScope(false), new AtomicReferenceScope(false)] - [2] | [new AtomicReferenceScope(false), new AtomicReferenceScope(false), new AtomicReferenceScope(false)] - [0, 2] | [new AtomicReferenceScope(false), new AtomicReferenceScope(false), new AtomicReferenceScope(false)] + newScope != scope + scopeManager.tlsScope.get() == newScope } @Unroll