Refactor ContinuableScope
This commit is contained in:
parent
6e62b79b8f
commit
0d7aa022db
|
@ -217,13 +217,13 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
|
||||||
protected final TraceScope.Continuation continuation;
|
protected final TraceScope.Continuation continuation;
|
||||||
|
|
||||||
public DatadogWrapper(TraceScope scope) {
|
public DatadogWrapper(TraceScope scope) {
|
||||||
continuation = scope.capture(true);
|
continuation = scope.capture();
|
||||||
log.debug("created continuation {} from scope {}", continuation, scope);
|
log.debug("created continuation {} from scope {}", continuation, scope);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void cancel() {
|
public void cancel() {
|
||||||
if (null != continuation) {
|
if (null != continuation) {
|
||||||
continuation.activate().close();
|
continuation.close();
|
||||||
log.debug("canceled continuation {}", continuation);
|
log.debug("canceled continuation {}", continuation);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,7 @@ public interface TraceScope {
|
||||||
*
|
*
|
||||||
* <p>Should be called on the parent thread.
|
* <p>Should be called on the parent thread.
|
||||||
*/
|
*/
|
||||||
Continuation capture(boolean finishOnClose);
|
Continuation capture();
|
||||||
|
|
||||||
/** Close the activated context and allow any underlying spans to finish. */
|
/** Close the activated context and allow any underlying spans to finish. */
|
||||||
void close();
|
void close();
|
||||||
|
@ -21,5 +21,8 @@ public interface TraceScope {
|
||||||
* <p>Should be called on the child thread.
|
* <p>Should be called on the child thread.
|
||||||
*/
|
*/
|
||||||
TraceScope activate();
|
TraceScope activate();
|
||||||
|
|
||||||
|
/** Cancel the continuation. */
|
||||||
|
void close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package datadog.opentracing.scopemanager;
|
package datadog.opentracing.scopemanager;
|
||||||
|
|
||||||
|
import datadog.opentracing.DDSpan;
|
||||||
import io.opentracing.Scope;
|
import io.opentracing.Scope;
|
||||||
import io.opentracing.ScopeManager;
|
import io.opentracing.ScopeManager;
|
||||||
import io.opentracing.Span;
|
import io.opentracing.Span;
|
||||||
|
@ -17,7 +18,11 @@ public class ContextualScopeManager implements ScopeManager {
|
||||||
return context.activate(span, finishOnClose);
|
return context.activate(span, finishOnClose);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return new ContinuableScope(this, span, finishOnClose);
|
if (span instanceof DDSpan) {
|
||||||
|
return new ContinuableScope(this, (DDSpan) span, finishOnClose);
|
||||||
|
} else {
|
||||||
|
return new SimpleScope(this, span, finishOnClose);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -1,11 +1,10 @@
|
||||||
package datadog.opentracing.scopemanager;
|
package datadog.opentracing.scopemanager;
|
||||||
|
|
||||||
|
import datadog.opentracing.DDSpan;
|
||||||
import datadog.opentracing.DDSpanContext;
|
import datadog.opentracing.DDSpanContext;
|
||||||
import datadog.opentracing.PendingTrace;
|
import datadog.opentracing.PendingTrace;
|
||||||
import datadog.trace.context.TraceScope;
|
import datadog.trace.context.TraceScope;
|
||||||
import io.opentracing.Scope;
|
import io.opentracing.Scope;
|
||||||
import io.opentracing.Span;
|
|
||||||
import io.opentracing.noop.NoopScopeManager;
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.lang.ref.WeakReference;
|
import java.lang.ref.WeakReference;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
@ -14,26 +13,38 @@ import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class ContinuableScope implements Scope, TraceScope {
|
public class ContinuableScope implements Scope, TraceScope {
|
||||||
final ContextualScopeManager scopeManager;
|
/** ScopeManager holding the thread-local to this scope. */
|
||||||
final AtomicInteger refCount;
|
private final ContextualScopeManager scopeManager;
|
||||||
private final Span wrapped;
|
/**
|
||||||
|
* Span contained by this scope. Async scopes will hold a reference to the parent scope's span.
|
||||||
|
*/
|
||||||
|
private final DDSpan spanUnderScope;
|
||||||
|
/** If true, finish the span when openCount hits 0. */
|
||||||
private final boolean finishOnClose;
|
private final boolean finishOnClose;
|
||||||
|
/** Count of open scope and continuations */
|
||||||
|
private final AtomicInteger openCount;
|
||||||
|
/** Scope to placed in the thread local after close. May be null. */
|
||||||
private final Scope toRestore;
|
private final Scope toRestore;
|
||||||
|
/** Continuation that created this scope. May be null. */
|
||||||
|
private final Continuation continuation;
|
||||||
|
|
||||||
ContinuableScope(
|
ContinuableScope(
|
||||||
final ContextualScopeManager scopeManager, final Span wrapped, final boolean finishOnClose) {
|
final ContextualScopeManager scopeManager,
|
||||||
this(scopeManager, new AtomicInteger(1), wrapped, finishOnClose);
|
final DDSpan spanUnderScope,
|
||||||
|
final boolean finishOnClose) {
|
||||||
|
this(scopeManager, new AtomicInteger(1), null, spanUnderScope, finishOnClose);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ContinuableScope(
|
private ContinuableScope(
|
||||||
final ContextualScopeManager scopeManager,
|
final ContextualScopeManager scopeManager,
|
||||||
final AtomicInteger refCount,
|
final AtomicInteger openCount,
|
||||||
final Span wrapped,
|
final Continuation continuation,
|
||||||
|
final DDSpan spanUnderScope,
|
||||||
final boolean finishOnClose) {
|
final boolean finishOnClose) {
|
||||||
|
|
||||||
this.scopeManager = scopeManager;
|
this.scopeManager = scopeManager;
|
||||||
this.refCount = refCount;
|
this.openCount = openCount;
|
||||||
this.wrapped = wrapped;
|
this.continuation = continuation;
|
||||||
|
this.spanUnderScope = spanUnderScope;
|
||||||
this.finishOnClose = finishOnClose;
|
this.finishOnClose = finishOnClose;
|
||||||
this.toRestore = scopeManager.tlsScope.get();
|
this.toRestore = scopeManager.tlsScope.get();
|
||||||
scopeManager.tlsScope.set(this);
|
scopeManager.tlsScope.set(this);
|
||||||
|
@ -41,20 +52,22 @@ public class ContinuableScope implements Scope, TraceScope {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
if (scopeManager.tlsScope.get() != this) {
|
if (null != continuation) {
|
||||||
return;
|
spanUnderScope.context().getTrace().cancelContinuation(continuation);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (refCount.decrementAndGet() == 0 && finishOnClose) {
|
if (openCount.decrementAndGet() == 0 && finishOnClose) {
|
||||||
wrapped.finish();
|
spanUnderScope.finish();
|
||||||
}
|
}
|
||||||
|
|
||||||
scopeManager.tlsScope.set(toRestore);
|
if (scopeManager.tlsScope.get() == this) {
|
||||||
|
scopeManager.tlsScope.set(toRestore);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Span span() {
|
public DDSpan span() {
|
||||||
return wrapped;
|
return spanUnderScope;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -63,8 +76,8 @@ public class ContinuableScope implements Scope, TraceScope {
|
||||||
* @param finishOnClose
|
* @param finishOnClose
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public Continuation capture(final boolean finishOnClose) {
|
public Continuation capture() {
|
||||||
return new Continuation(this.finishOnClose && finishOnClose);
|
return new Continuation();
|
||||||
}
|
}
|
||||||
|
|
||||||
public class Continuation implements Closeable, TraceScope.Continuation {
|
public class Continuation implements Closeable, TraceScope.Continuation {
|
||||||
|
@ -72,72 +85,32 @@ public class ContinuableScope implements Scope, TraceScope {
|
||||||
|
|
||||||
private final AtomicBoolean used = new AtomicBoolean(false);
|
private final AtomicBoolean used = new AtomicBoolean(false);
|
||||||
private final PendingTrace trace;
|
private final PendingTrace trace;
|
||||||
private final boolean finishSpanOnClose;
|
|
||||||
|
|
||||||
private Continuation(final boolean finishOnClose) {
|
private Continuation() {
|
||||||
this.finishSpanOnClose = finishOnClose;
|
openCount.incrementAndGet();
|
||||||
refCount.incrementAndGet();
|
final DDSpanContext context = (DDSpanContext) spanUnderScope.context();
|
||||||
if (wrapped.context() instanceof DDSpanContext) {
|
trace = context.getTrace();
|
||||||
final DDSpanContext context = (DDSpanContext) wrapped.context();
|
trace.registerContinuation(this);
|
||||||
trace = context.getTrace();
|
|
||||||
trace.registerContinuation(this);
|
|
||||||
} else {
|
|
||||||
trace = null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClosingScope activate() {
|
public ContinuableScope activate() {
|
||||||
if (used.compareAndSet(false, true)) {
|
if (used.compareAndSet(false, true)) {
|
||||||
for (final ScopeContext context : scopeManager.scopeContexts) {
|
return new ContinuableScope(scopeManager, openCount, this, spanUnderScope, finishOnClose);
|
||||||
if (context.inContext()) {
|
|
||||||
return new ClosingScope(context.activate(wrapped, finishSpanOnClose));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return new ClosingScope(
|
|
||||||
new ContinuableScope(scopeManager, refCount, wrapped, finishSpanOnClose));
|
|
||||||
} else {
|
} else {
|
||||||
log.debug("Reusing a continuation not allowed. Returning no-op scope.");
|
log.debug(
|
||||||
return new ClosingScope(NoopScopeManager.NoopScope.INSTANCE);
|
"Failed to activate continuation. Reusing a continuation not allowed. Returning a new scope. Spans will not be linked.");
|
||||||
|
return new ContinuableScope(
|
||||||
|
scopeManager, new AtomicInteger(1), null, spanUnderScope, finishOnClose);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
used.getAndSet(true);
|
if (used.compareAndSet(false, true)) {
|
||||||
if (trace != null) {
|
|
||||||
trace.cancelContinuation(this);
|
trace.cancelContinuation(this);
|
||||||
}
|
ContinuableScope.this.close();
|
||||||
}
|
} else {
|
||||||
|
log.debug("Failed to close continuation {}. Already used.", this);
|
||||||
private class ClosingScope implements Scope, TraceScope {
|
|
||||||
private final Scope wrappedScope;
|
|
||||||
|
|
||||||
private ClosingScope(final Scope wrappedScope) {
|
|
||||||
this.wrappedScope = wrappedScope;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Continuation capture(boolean finishOnClose) {
|
|
||||||
if (wrappedScope instanceof TraceScope) {
|
|
||||||
return ((TraceScope) wrappedScope).capture(finishOnClose);
|
|
||||||
} else {
|
|
||||||
log.debug(
|
|
||||||
"{} Failed to capture. ClosingScope does not wrap a TraceScope: {}.",
|
|
||||||
this,
|
|
||||||
wrappedScope);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() {
|
|
||||||
wrappedScope.close();
|
|
||||||
ContinuableScope.Continuation.this.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Span span() {
|
|
||||||
return wrappedScope.span();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,39 @@
|
||||||
|
package datadog.opentracing.scopemanager;
|
||||||
|
|
||||||
|
import io.opentracing.Scope;
|
||||||
|
import io.opentracing.Span;
|
||||||
|
|
||||||
|
/** Simple scope implementation which does not propagate across threads. */
|
||||||
|
public class SimpleScope implements Scope {
|
||||||
|
private final ContextualScopeManager scopeManager;
|
||||||
|
private final Span spanUnderScope;
|
||||||
|
private final boolean finishOnClose;
|
||||||
|
private final Scope toRestore;
|
||||||
|
|
||||||
|
public SimpleScope(
|
||||||
|
final ContextualScopeManager scopeManager,
|
||||||
|
final Span spanUnderScope,
|
||||||
|
final boolean finishOnClose) {
|
||||||
|
this.scopeManager = scopeManager;
|
||||||
|
this.spanUnderScope = spanUnderScope;
|
||||||
|
this.finishOnClose = finishOnClose;
|
||||||
|
this.toRestore = scopeManager.tlsScope.get();
|
||||||
|
scopeManager.tlsScope.set(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
if (finishOnClose) {
|
||||||
|
spanUnderScope.finish();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (scopeManager.tlsScope.get() == this) {
|
||||||
|
scopeManager.tlsScope.set(toRestore);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Span span() {
|
||||||
|
return spanUnderScope;
|
||||||
|
}
|
||||||
|
}
|
|
@ -92,11 +92,11 @@ class ScopeManagerTest extends Specification {
|
||||||
finishSpan << [true, false]
|
finishSpan << [true, false]
|
||||||
}
|
}
|
||||||
|
|
||||||
def "ref counting scope doesn't close if non-zero"() {
|
def "ContinuableScope doesn't close if non-zero"() {
|
||||||
setup:
|
setup:
|
||||||
def builder = tracer.buildSpan("test")
|
def builder = tracer.buildSpan("test")
|
||||||
def scope = (ContinuableScope) builder.startActive(true)
|
def scope = (ContinuableScope) builder.startActive(true)
|
||||||
def continuation = scope.capture(true)
|
def continuation = scope.capture()
|
||||||
|
|
||||||
expect:
|
expect:
|
||||||
!spanFinished(scope.span())
|
!spanFinished(scope.span())
|
||||||
|
@ -149,7 +149,7 @@ class ScopeManagerTest extends Specification {
|
||||||
def builder = tracer.buildSpan("test")
|
def builder = tracer.buildSpan("test")
|
||||||
def scope = (ContinuableScope) builder.startActive(false)
|
def scope = (ContinuableScope) builder.startActive(false)
|
||||||
def span = scope.span()
|
def span = scope.span()
|
||||||
def continuation = scope.capture(true)
|
def continuation = scope.capture()
|
||||||
scope.close()
|
scope.close()
|
||||||
span.finish()
|
span.finish()
|
||||||
|
|
||||||
|
@ -188,7 +188,7 @@ class ScopeManagerTest extends Specification {
|
||||||
ContinuableScope childScope = (ContinuableScope) tracer.buildSpan("parent").startActive(true)
|
ContinuableScope childScope = (ContinuableScope) tracer.buildSpan("parent").startActive(true)
|
||||||
def childSpan = childScope.span()
|
def childSpan = childScope.span()
|
||||||
|
|
||||||
def continuation = childScope.capture(true)
|
def continuation = childScope.capture()
|
||||||
childScope.close()
|
childScope.close()
|
||||||
|
|
||||||
expect:
|
expect:
|
||||||
|
@ -209,11 +209,11 @@ class ScopeManagerTest extends Specification {
|
||||||
|
|
||||||
when:
|
when:
|
||||||
def newScope = continuation.activate()
|
def newScope = continuation.activate()
|
||||||
def newContinuation = newScope.capture(true)
|
def newContinuation = newScope.capture()
|
||||||
|
|
||||||
then:
|
then:
|
||||||
newScope instanceof ContinuableScope.Continuation.ClosingScope
|
newScope instanceof ContinuableScope
|
||||||
scopeManager.active() == newScope.wrappedScope
|
scopeManager.active() == newScope
|
||||||
newScope != childScope && newScope != parentScope
|
newScope != childScope && newScope != parentScope
|
||||||
newScope.span() == childSpan
|
newScope.span() == childSpan
|
||||||
!spanFinished(childSpan)
|
!spanFinished(childSpan)
|
||||||
|
@ -236,16 +236,16 @@ class ScopeManagerTest extends Specification {
|
||||||
def builder = tracer.buildSpan("test")
|
def builder = tracer.buildSpan("test")
|
||||||
def scope = (ContinuableScope) builder.startActive(false)
|
def scope = (ContinuableScope) builder.startActive(false)
|
||||||
def span = scope.span()
|
def span = scope.span()
|
||||||
def continuation = scope.capture(false)
|
def continuation = scope.capture()
|
||||||
scope.close()
|
scope.close()
|
||||||
span.finish()
|
span.finish()
|
||||||
|
|
||||||
def newScope = continuation.activate()
|
def newScope = continuation.activate()
|
||||||
|
|
||||||
expect:
|
expect:
|
||||||
newScope instanceof ContinuableScope.Continuation.ClosingScope
|
newScope instanceof ContinuableScope
|
||||||
newScope != scope
|
newScope != scope
|
||||||
scopeManager.active() == newScope.wrappedScope
|
scopeManager.active() == newScope
|
||||||
spanFinished(span)
|
spanFinished(span)
|
||||||
writer == []
|
writer == []
|
||||||
|
|
||||||
|
@ -259,24 +259,7 @@ class ScopeManagerTest extends Specification {
|
||||||
scopeManager.active() == null
|
scopeManager.active() == null
|
||||||
spanFinished(childSpan)
|
spanFinished(childSpan)
|
||||||
childSpan.context().parentId == span.context().spanId
|
childSpan.context().parentId == span.context().spanId
|
||||||
writer == []
|
|
||||||
|
|
||||||
when:
|
|
||||||
if (closeScope) {
|
|
||||||
newScope.close()
|
|
||||||
}
|
|
||||||
if (closeContinuation) {
|
|
||||||
continuation.close()
|
|
||||||
}
|
|
||||||
|
|
||||||
then:
|
|
||||||
writer == [[childSpan, span]]
|
writer == [[childSpan, span]]
|
||||||
|
|
||||||
where:
|
|
||||||
closeScope | closeContinuation
|
|
||||||
true | false
|
|
||||||
false | true
|
|
||||||
true | true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Unroll
|
@Unroll
|
||||||
|
@ -327,40 +310,27 @@ class ScopeManagerTest extends Specification {
|
||||||
[new AtomicReferenceScope(false), new AtomicReferenceScope(false), new AtomicReferenceScope(false)] | _
|
[new AtomicReferenceScope(false), new AtomicReferenceScope(false), new AtomicReferenceScope(false)] | _
|
||||||
}
|
}
|
||||||
|
|
||||||
@Unroll
|
def "ContinuableScope put in threadLocal after continuation activation"() {
|
||||||
def "threadlocal to context with capture (#active)"() {
|
|
||||||
setup:
|
setup:
|
||||||
contexts.each {
|
|
||||||
scopeManager.addScopeContext(it)
|
|
||||||
}
|
|
||||||
ContinuableScope scope = (ContinuableScope) tracer.buildSpan("parent").startActive(true)
|
ContinuableScope scope = (ContinuableScope) tracer.buildSpan("parent").startActive(true)
|
||||||
|
|
||||||
expect:
|
expect:
|
||||||
scopeManager.tlsScope.get() == scope
|
scopeManager.tlsScope.get() == scope
|
||||||
|
|
||||||
when:
|
when:
|
||||||
def cont = scope.capture(true)
|
def cont = scope.capture()
|
||||||
scope.close()
|
scope.close()
|
||||||
|
|
||||||
then:
|
then:
|
||||||
scopeManager.tlsScope.get() == null
|
scopeManager.tlsScope.get() == null
|
||||||
|
|
||||||
when:
|
when:
|
||||||
active.each {
|
scopeManager.addScopeContext(new AtomicReferenceScope(true))
|
||||||
((AtomicBoolean) contexts[it].enabled).set(true)
|
def newScope = cont.activate()
|
||||||
}
|
|
||||||
cont.activate()
|
|
||||||
|
|
||||||
then:
|
then:
|
||||||
scopeManager.tlsScope.get() == null
|
newScope != scope
|
||||||
|
scopeManager.tlsScope.get() == newScope
|
||||||
where:
|
|
||||||
active | contexts
|
|
||||||
[0] | [new AtomicReferenceScope(false)]
|
|
||||||
[0] | [new AtomicReferenceScope(false), new AtomicReferenceScope(false)]
|
|
||||||
[1] | [new AtomicReferenceScope(false), new AtomicReferenceScope(false), new AtomicReferenceScope(false)]
|
|
||||||
[2] | [new AtomicReferenceScope(false), new AtomicReferenceScope(false), new AtomicReferenceScope(false)]
|
|
||||||
[0, 2] | [new AtomicReferenceScope(false), new AtomicReferenceScope(false), new AtomicReferenceScope(false)]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Unroll
|
@Unroll
|
||||||
|
|
Loading…
Reference in New Issue