Allow closing of continuation explicitly
to allow a completed trace to be reported more timely. Continuation is closed when the returned scope is closed, but can also be closed directly.
This commit is contained in:
parent
087b2e7298
commit
f57faba5db
|
@ -83,11 +83,23 @@ public class PendingTrace extends ConcurrentLinkedDeque<DDSpan> {
|
||||||
* completed, so we need to wait till continuations are de-referenced before reporting.
|
* completed, so we need to wait till continuations are de-referenced before reporting.
|
||||||
*/
|
*/
|
||||||
public void registerContinuation(final ContinuableScope.Continuation continuation) {
|
public void registerContinuation(final ContinuableScope.Continuation continuation) {
|
||||||
weakReferences.add(
|
continuation.ref =
|
||||||
new WeakReference<ContinuableScope.Continuation>(continuation, referenceQueue));
|
new WeakReference<ContinuableScope.Continuation>(continuation, referenceQueue);
|
||||||
|
weakReferences.add(continuation.ref);
|
||||||
pendingReferenceCount.incrementAndGet();
|
pendingReferenceCount.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void cancelContinuation(final ContinuableScope.Continuation continuation) {
|
||||||
|
synchronized (continuation) {
|
||||||
|
if (continuation.ref != null) {
|
||||||
|
weakReferences.remove(continuation.ref);
|
||||||
|
continuation.ref.clear();
|
||||||
|
continuation.ref = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
expireReference();
|
||||||
|
}
|
||||||
|
|
||||||
private void expireReference() {
|
private void expireReference() {
|
||||||
if (pendingReferenceCount.decrementAndGet() == 0) {
|
if (pendingReferenceCount.decrementAndGet() == 0) {
|
||||||
write();
|
write();
|
||||||
|
|
|
@ -1,10 +1,17 @@
|
||||||
package datadog.opentracing.scopemanager;
|
package datadog.opentracing.scopemanager;
|
||||||
|
|
||||||
import datadog.opentracing.DDSpanContext;
|
import datadog.opentracing.DDSpanContext;
|
||||||
|
import datadog.opentracing.PendingTrace;
|
||||||
import io.opentracing.Scope;
|
import io.opentracing.Scope;
|
||||||
import io.opentracing.Span;
|
import io.opentracing.Span;
|
||||||
|
import io.opentracing.noop.NoopScopeManager;
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.lang.ref.WeakReference;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
public class ContinuableScope implements Scope {
|
public class ContinuableScope implements Scope {
|
||||||
final ContextualScopeManager scopeManager;
|
final ContextualScopeManager scopeManager;
|
||||||
final AtomicInteger refCount;
|
final AtomicInteger refCount;
|
||||||
|
@ -49,12 +56,21 @@ public class ContinuableScope implements Scope {
|
||||||
return wrapped;
|
return wrapped;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The continuation returned should be closed after the associa
|
||||||
|
*
|
||||||
|
* @param finishOnClose
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
public Continuation capture(final boolean finishOnClose) {
|
public Continuation capture(final boolean finishOnClose) {
|
||||||
return new Continuation(this.finishOnClose && finishOnClose);
|
return new Continuation(this.finishOnClose && finishOnClose);
|
||||||
}
|
}
|
||||||
|
|
||||||
public class Continuation {
|
public class Continuation implements Closeable {
|
||||||
|
public WeakReference<Continuation> ref;
|
||||||
|
|
||||||
|
private final AtomicBoolean used = new AtomicBoolean(false);
|
||||||
|
private final PendingTrace trace;
|
||||||
private final boolean finishSpanOnClose;
|
private final boolean finishSpanOnClose;
|
||||||
|
|
||||||
private Continuation(final boolean finishOnClose) {
|
private Continuation(final boolean finishOnClose) {
|
||||||
|
@ -62,17 +78,53 @@ public class ContinuableScope implements Scope {
|
||||||
refCount.incrementAndGet();
|
refCount.incrementAndGet();
|
||||||
if (wrapped.context() instanceof DDSpanContext) {
|
if (wrapped.context() instanceof DDSpanContext) {
|
||||||
final DDSpanContext context = (DDSpanContext) wrapped.context();
|
final DDSpanContext context = (DDSpanContext) wrapped.context();
|
||||||
context.getTrace().registerContinuation(this);
|
trace = context.getTrace();
|
||||||
|
trace.registerContinuation(this);
|
||||||
|
} else {
|
||||||
|
trace = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Scope activate() {
|
public Scope activate() {
|
||||||
for (final ScopeContext context : scopeManager.scopeContexts) {
|
if (used.compareAndSet(false, true)) {
|
||||||
if (context.inContext()) {
|
for (final ScopeContext context : scopeManager.scopeContexts) {
|
||||||
return context.activate(wrapped, finishSpanOnClose);
|
if (context.inContext()) {
|
||||||
|
return new ClosingScope(context.activate(wrapped, finishSpanOnClose));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return new ClosingScope(
|
||||||
|
new ContinuableScope(scopeManager, refCount, wrapped, finishSpanOnClose));
|
||||||
|
} else {
|
||||||
|
log.debug("Reusing a continuation not allowed. Returning no-op scope.");
|
||||||
|
return NoopScopeManager.NoopScope.INSTANCE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
used.getAndSet(true);
|
||||||
|
if (trace != null) {
|
||||||
|
trace.cancelContinuation(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class ClosingScope implements Scope {
|
||||||
|
private final Scope wrapped;
|
||||||
|
|
||||||
|
private ClosingScope(final Scope wrapped) {
|
||||||
|
this.wrapped = wrapped;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
wrapped.close();
|
||||||
|
Continuation.this.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Span span() {
|
||||||
|
return wrapped.span();
|
||||||
}
|
}
|
||||||
return new ContinuableScope(scopeManager, refCount, wrapped, finishSpanOnClose);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -114,20 +114,34 @@ class ScopeManagerTest extends Specification {
|
||||||
|
|
||||||
when:
|
when:
|
||||||
continuation.activate()
|
continuation.activate()
|
||||||
continuation = null // Continuation references also hold up traces.
|
if (forceGC) {
|
||||||
PendingTrace.awaitGC()
|
continuation = null // Continuation references also hold up traces.
|
||||||
((DDSpanContext) scopeManager.active().span().context()).trace.clean()
|
PendingTrace.awaitGC()
|
||||||
|
((DDSpanContext) scope.span().context()).trace.clean()
|
||||||
|
}
|
||||||
|
if (autoClose) {
|
||||||
|
if (continuation != null) {
|
||||||
|
continuation.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
then:
|
then:
|
||||||
scopeManager.active() != null
|
scopeManager.active() != null
|
||||||
|
|
||||||
when:
|
when:
|
||||||
scopeManager.active().close()
|
scopeManager.active().close()
|
||||||
|
writer.waitForTraces(1)
|
||||||
|
|
||||||
then:
|
then:
|
||||||
scopeManager.active() == null
|
scopeManager.active() == null
|
||||||
spanFinished(scope.span())
|
spanFinished(scope.span())
|
||||||
writer == [[scope.span()]]
|
writer == [[scope.span()]]
|
||||||
|
|
||||||
|
where:
|
||||||
|
autoClose | forceGC
|
||||||
|
true | true
|
||||||
|
true | false
|
||||||
|
false | true
|
||||||
}
|
}
|
||||||
|
|
||||||
def "hard reference on continuation prevents trace from reporting"() {
|
def "hard reference on continuation prevents trace from reporting"() {
|
||||||
|
@ -145,13 +159,26 @@ class ScopeManagerTest extends Specification {
|
||||||
writer == []
|
writer == []
|
||||||
|
|
||||||
when:
|
when:
|
||||||
// remove hard reference and force GC
|
if (forceGC) {
|
||||||
continuation = null
|
continuation = null // Continuation references also hold up traces.
|
||||||
PendingTrace.awaitGC()
|
PendingTrace.awaitGC()
|
||||||
span.context().trace.clean()
|
((DDSpanContext) span.context()).trace.clean()
|
||||||
|
writer.waitForTraces(1)
|
||||||
|
}
|
||||||
|
if (autoClose) {
|
||||||
|
if (continuation != null) {
|
||||||
|
continuation.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
then:
|
then:
|
||||||
writer == [[span]]
|
writer == [[span]]
|
||||||
|
|
||||||
|
where:
|
||||||
|
autoClose | forceGC
|
||||||
|
true | true
|
||||||
|
true | false
|
||||||
|
false | true
|
||||||
}
|
}
|
||||||
|
|
||||||
def "continuation restores trace"() {
|
def "continuation restores trace"() {
|
||||||
|
@ -161,7 +188,7 @@ class ScopeManagerTest extends Specification {
|
||||||
ContinuableScope childScope = (ContinuableScope) tracer.buildSpan("parent").startActive(true)
|
ContinuableScope childScope = (ContinuableScope) tracer.buildSpan("parent").startActive(true)
|
||||||
def childSpan = childScope.span()
|
def childSpan = childScope.span()
|
||||||
|
|
||||||
def cont = childScope.capture(true)
|
def continuation = childScope.capture(true)
|
||||||
childScope.close()
|
childScope.close()
|
||||||
|
|
||||||
expect:
|
expect:
|
||||||
|
@ -181,13 +208,10 @@ class ScopeManagerTest extends Specification {
|
||||||
writer == []
|
writer == []
|
||||||
|
|
||||||
when:
|
when:
|
||||||
def newScope = cont.activate()
|
def newScope = continuation.activate()
|
||||||
cont = null // Continuation references also hold up traces.
|
|
||||||
PendingTrace.awaitGC()
|
|
||||||
((DDSpanContext) scopeManager.active().span().context()).trace.clean()
|
|
||||||
|
|
||||||
then:
|
then:
|
||||||
scopeManager.active() == newScope
|
scopeManager.active() == newScope.wrapped
|
||||||
newScope != childScope && newScope != parentScope
|
newScope != childScope && newScope != parentScope
|
||||||
newScope.span() == childSpan
|
newScope.span() == childSpan
|
||||||
!spanFinished(childSpan)
|
!spanFinished(childSpan)
|
||||||
|
@ -217,7 +241,7 @@ class ScopeManagerTest extends Specification {
|
||||||
|
|
||||||
expect:
|
expect:
|
||||||
newScope != scope
|
newScope != scope
|
||||||
scopeManager.active() == newScope
|
scopeManager.active() == newScope.wrapped
|
||||||
spanFinished(span)
|
spanFinished(span)
|
||||||
writer == []
|
writer == []
|
||||||
|
|
||||||
|
@ -234,14 +258,21 @@ class ScopeManagerTest extends Specification {
|
||||||
writer == []
|
writer == []
|
||||||
|
|
||||||
when:
|
when:
|
||||||
// remove hard reference and force GC
|
if (closeScope) {
|
||||||
continuation = null
|
newScope.close()
|
||||||
PendingTrace.awaitGC()
|
}
|
||||||
span.context().trace.clean()
|
if (closeContinuation) {
|
||||||
writer.waitForTraces(1)
|
continuation.close()
|
||||||
|
}
|
||||||
|
|
||||||
then:
|
then:
|
||||||
writer == [[childSpan, span]]
|
writer == [[childSpan, span]]
|
||||||
|
|
||||||
|
where:
|
||||||
|
closeScope | closeContinuation
|
||||||
|
true | false
|
||||||
|
false | true
|
||||||
|
true | true
|
||||||
}
|
}
|
||||||
|
|
||||||
@Unroll
|
@Unroll
|
||||||
|
|
Loading…
Reference in New Issue