diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/PendingTrace.java b/dd-trace-ot/src/main/java/datadog/opentracing/PendingTrace.java index e5f6328af6..a87b6e0de6 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/PendingTrace.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/PendingTrace.java @@ -83,11 +83,23 @@ public class PendingTrace extends ConcurrentLinkedDeque { * completed, so we need to wait till continuations are de-referenced before reporting. */ public void registerContinuation(final ContinuableScope.Continuation continuation) { - weakReferences.add( - new WeakReference(continuation, referenceQueue)); + continuation.ref = + new WeakReference(continuation, referenceQueue); + weakReferences.add(continuation.ref); pendingReferenceCount.incrementAndGet(); } + public void cancelContinuation(final ContinuableScope.Continuation continuation) { + synchronized (continuation) { + if (continuation.ref != null) { + weakReferences.remove(continuation.ref); + continuation.ref.clear(); + continuation.ref = null; + } + } + expireReference(); + } + private void expireReference() { if (pendingReferenceCount.decrementAndGet() == 0) { write(); diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java index 9a4a62650c..19bb127419 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java @@ -1,10 +1,17 @@ package datadog.opentracing.scopemanager; import datadog.opentracing.DDSpanContext; +import datadog.opentracing.PendingTrace; import io.opentracing.Scope; import io.opentracing.Span; +import io.opentracing.noop.NoopScopeManager; +import java.io.Closeable; +import java.lang.ref.WeakReference; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import lombok.extern.slf4j.Slf4j; +@Slf4j public class ContinuableScope implements Scope { final ContextualScopeManager scopeManager; final AtomicInteger refCount; @@ -49,12 +56,21 @@ public class ContinuableScope implements Scope { return wrapped; } + /** + * The continuation returned should be closed after the associa + * + * @param finishOnClose + * @return + */ public Continuation capture(final boolean finishOnClose) { return new Continuation(this.finishOnClose && finishOnClose); } - public class Continuation { + public class Continuation implements Closeable { + public WeakReference ref; + private final AtomicBoolean used = new AtomicBoolean(false); + private final PendingTrace trace; private final boolean finishSpanOnClose; private Continuation(final boolean finishOnClose) { @@ -62,17 +78,53 @@ public class ContinuableScope implements Scope { refCount.incrementAndGet(); if (wrapped.context() instanceof DDSpanContext) { final DDSpanContext context = (DDSpanContext) wrapped.context(); - context.getTrace().registerContinuation(this); + trace = context.getTrace(); + trace.registerContinuation(this); + } else { + trace = null; } } public Scope activate() { - for (final ScopeContext context : scopeManager.scopeContexts) { - if (context.inContext()) { - return context.activate(wrapped, finishSpanOnClose); + if (used.compareAndSet(false, true)) { + for (final ScopeContext context : scopeManager.scopeContexts) { + if (context.inContext()) { + return new ClosingScope(context.activate(wrapped, finishSpanOnClose)); + } } + return new ClosingScope( + new ContinuableScope(scopeManager, refCount, wrapped, finishSpanOnClose)); + } else { + log.debug("Reusing a continuation not allowed. Returning no-op scope."); + return NoopScopeManager.NoopScope.INSTANCE; + } + } + + @Override + public void close() { + used.getAndSet(true); + if (trace != null) { + trace.cancelContinuation(this); + } + } + + private class ClosingScope implements Scope { + private final Scope wrapped; + + private ClosingScope(final Scope wrapped) { + this.wrapped = wrapped; + } + + @Override + public void close() { + wrapped.close(); + Continuation.this.close(); + } + + @Override + public Span span() { + return wrapped.span(); } - return new ContinuableScope(scopeManager, refCount, wrapped, finishSpanOnClose); } } } diff --git a/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy b/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy index 7536a67e4e..e5151c1071 100644 --- a/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy @@ -114,20 +114,34 @@ class ScopeManagerTest extends Specification { when: continuation.activate() - continuation = null // Continuation references also hold up traces. - PendingTrace.awaitGC() - ((DDSpanContext) scopeManager.active().span().context()).trace.clean() + if (forceGC) { + continuation = null // Continuation references also hold up traces. + PendingTrace.awaitGC() + ((DDSpanContext) scope.span().context()).trace.clean() + } + if (autoClose) { + if (continuation != null) { + continuation.close() + } + } then: scopeManager.active() != null when: scopeManager.active().close() + writer.waitForTraces(1) then: scopeManager.active() == null spanFinished(scope.span()) writer == [[scope.span()]] + + where: + autoClose | forceGC + true | true + true | false + false | true } def "hard reference on continuation prevents trace from reporting"() { @@ -145,13 +159,26 @@ class ScopeManagerTest extends Specification { writer == [] when: - // remove hard reference and force GC - continuation = null - PendingTrace.awaitGC() - span.context().trace.clean() + if (forceGC) { + continuation = null // Continuation references also hold up traces. + PendingTrace.awaitGC() + ((DDSpanContext) span.context()).trace.clean() + writer.waitForTraces(1) + } + if (autoClose) { + if (continuation != null) { + continuation.close() + } + } then: writer == [[span]] + + where: + autoClose | forceGC + true | true + true | false + false | true } def "continuation restores trace"() { @@ -161,7 +188,7 @@ class ScopeManagerTest extends Specification { ContinuableScope childScope = (ContinuableScope) tracer.buildSpan("parent").startActive(true) def childSpan = childScope.span() - def cont = childScope.capture(true) + def continuation = childScope.capture(true) childScope.close() expect: @@ -181,13 +208,10 @@ class ScopeManagerTest extends Specification { writer == [] when: - def newScope = cont.activate() - cont = null // Continuation references also hold up traces. - PendingTrace.awaitGC() - ((DDSpanContext) scopeManager.active().span().context()).trace.clean() + def newScope = continuation.activate() then: - scopeManager.active() == newScope + scopeManager.active() == newScope.wrapped newScope != childScope && newScope != parentScope newScope.span() == childSpan !spanFinished(childSpan) @@ -217,7 +241,7 @@ class ScopeManagerTest extends Specification { expect: newScope != scope - scopeManager.active() == newScope + scopeManager.active() == newScope.wrapped spanFinished(span) writer == [] @@ -234,14 +258,21 @@ class ScopeManagerTest extends Specification { writer == [] when: - // remove hard reference and force GC - continuation = null - PendingTrace.awaitGC() - span.context().trace.clean() - writer.waitForTraces(1) + if (closeScope) { + newScope.close() + } + if (closeContinuation) { + continuation.close() + } then: writer == [[childSpan, span]] + + where: + closeScope | closeContinuation + true | false + false | true + true | true } @Unroll