Introduce ScopeContext and ReferenceCountingScopes
This commit is contained in:
parent
f78c290b40
commit
27c1d71504
|
@ -58,6 +58,8 @@ public class DDSpanContext implements io.opentracing.SpanContext {
|
|||
/** Tags are associated to the current span, they will not propagate to the children span */
|
||||
private Map<String, Object> tags;
|
||||
|
||||
@JsonIgnore public final boolean useRefCounting;
|
||||
|
||||
public DDSpanContext(
|
||||
final long traceId,
|
||||
final long spanId,
|
||||
|
@ -71,7 +73,8 @@ public class DDSpanContext implements io.opentracing.SpanContext {
|
|||
final String spanType,
|
||||
final Map<String, Object> tags,
|
||||
final Queue<DDSpan> trace,
|
||||
final DDTracer tracer) {
|
||||
final DDTracer tracer,
|
||||
final boolean useRefCounting) {
|
||||
|
||||
this.traceId = traceId;
|
||||
this.spanId = spanId;
|
||||
|
@ -92,6 +95,8 @@ public class DDSpanContext implements io.opentracing.SpanContext {
|
|||
|
||||
this.tags = tags;
|
||||
|
||||
this.useRefCounting = useRefCounting;
|
||||
|
||||
if (trace == null) {
|
||||
// TODO: figure out better concurrency model.
|
||||
this.trace = new ConcurrentLinkedQueue<>();
|
||||
|
|
|
@ -6,6 +6,8 @@ import datadog.opentracing.decorators.AbstractDecorator;
|
|||
import datadog.opentracing.decorators.DDDecoratorsFactory;
|
||||
import datadog.opentracing.propagation.Codec;
|
||||
import datadog.opentracing.propagation.HTTPCodec;
|
||||
import datadog.opentracing.scopemanager.ContextualScopeManager;
|
||||
import datadog.opentracing.scopemanager.ScopeContext;
|
||||
import datadog.trace.api.DDTags;
|
||||
import datadog.trace.common.DDTraceConfig;
|
||||
import datadog.trace.common.Service;
|
||||
|
@ -21,7 +23,6 @@ import io.opentracing.ScopeManager;
|
|||
import io.opentracing.Span;
|
||||
import io.opentracing.SpanContext;
|
||||
import io.opentracing.propagation.Format;
|
||||
import io.opentracing.util.ThreadLocalScopeManager;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
@ -29,6 +30,9 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
|
@ -44,16 +48,17 @@ public class DDTracer implements io.opentracing.Tracer {
|
|||
final Writer writer;
|
||||
/** Sampler defines the sampling policy in order to reduce the number of traces for instance */
|
||||
final Sampler sampler;
|
||||
/**
|
||||
* Scope manager is in charge of managing the scopes from which spans are created
|
||||
*/
|
||||
final ScopeManager scopeManager;
|
||||
/** Scope manager is in charge of managing the scopes from which spans are created */
|
||||
final ContextualScopeManager scopeManager = new ContextualScopeManager();
|
||||
|
||||
/** A set of tags that are added to every span */
|
||||
private final Map<String, Object> spanTags;
|
||||
|
||||
/** Span context decorators */
|
||||
private final Map<String, List<AbstractDecorator>> spanContextDecorators = new HashMap<>();
|
||||
private final Map<String, List<AbstractDecorator>> spanContextDecorators =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
private final Set<ScopeContext> contextualScopeManagers = new ConcurrentSkipListSet<>();
|
||||
|
||||
private final CodecRegistry registry;
|
||||
private final Map<String, Service> services = new HashMap<>();
|
||||
|
@ -72,8 +77,7 @@ public class DDTracer implements io.opentracing.Tracer {
|
|||
config.getProperty(DDTraceConfig.SERVICE_NAME),
|
||||
Writer.Builder.forConfig(config),
|
||||
Sampler.Builder.forConfig(config),
|
||||
new ThreadLocalScopeManager(),
|
||||
DDTraceConfig.parseMap(config.getProperty(DDTraceConfig.SPAN_TAGS)));
|
||||
DDTraceConfig.parseMap(config.getProperty(DDTraceConfig.SPAN_TAGS)));
|
||||
log.debug("Using config: {}", config);
|
||||
|
||||
// Create decorators from resource files
|
||||
|
@ -85,20 +89,18 @@ public class DDTracer implements io.opentracing.Tracer {
|
|||
}
|
||||
|
||||
public DDTracer(final String serviceName, final Writer writer, final Sampler sampler) {
|
||||
this(serviceName, writer, sampler, new ThreadLocalScopeManager(), Collections.<String, Object>emptyMap());
|
||||
this(serviceName, writer, sampler, Collections.<String, Object>emptyMap());
|
||||
}
|
||||
|
||||
public DDTracer(
|
||||
final String serviceName,
|
||||
final Writer writer,
|
||||
final Sampler sampler,
|
||||
final ScopeManager scopeManager,
|
||||
final Map<String, Object> spanTags) {
|
||||
this.serviceName = serviceName;
|
||||
this.writer = writer;
|
||||
this.writer.start();
|
||||
this.sampler = sampler;
|
||||
this.scopeManager = scopeManager;
|
||||
this.spanTags = spanTags;
|
||||
|
||||
registry = new CodecRegistry();
|
||||
|
@ -116,8 +118,7 @@ public class DDTracer implements io.opentracing.Tracer {
|
|||
UNASSIGNED_DEFAULT_SERVICE_NAME,
|
||||
writer,
|
||||
new AllSampler(),
|
||||
new ThreadLocalScopeManager(),
|
||||
DDTraceConfig.parseMap(new DDTraceConfig().getProperty(DDTraceConfig.SPAN_TAGS)));
|
||||
DDTraceConfig.parseMap(new DDTraceConfig().getProperty(DDTraceConfig.SPAN_TAGS)));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -145,8 +146,12 @@ public class DDTracer implements io.opentracing.Tracer {
|
|||
spanContextDecorators.put(decorator.getMatchingTag(), list);
|
||||
}
|
||||
|
||||
public void addScopeContext(final ScopeContext context) {
|
||||
scopeManager.addScopeContext(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScopeManager scopeManager() {
|
||||
public ContextualScopeManager scopeManager() {
|
||||
return scopeManager;
|
||||
}
|
||||
|
||||
|
@ -274,6 +279,7 @@ public class DDTracer implements io.opentracing.Tracer {
|
|||
private boolean errorFlag;
|
||||
private String spanType;
|
||||
private boolean ignoreScope = false;
|
||||
private boolean useRefCounting = false;
|
||||
|
||||
public DDSpanBuilder(final String operationName, final ScopeManager scopeManager) {
|
||||
this.operationName = operationName;
|
||||
|
@ -335,7 +341,11 @@ public class DDTracer implements io.opentracing.Tracer {
|
|||
|
||||
@Override
|
||||
public DDSpanBuilder withTag(final String tag, final boolean bool) {
|
||||
return withTag(tag, (Object) bool);
|
||||
if (bool && tag.equals("dd.use.ref.counting")) {
|
||||
return withReferenceCounting();
|
||||
} else {
|
||||
return withTag(tag, (Object) bool);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -359,6 +369,11 @@ public class DDTracer implements io.opentracing.Tracer {
|
|||
return this;
|
||||
}
|
||||
|
||||
public DDSpanBuilder withReferenceCounting() {
|
||||
this.useRefCounting = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
public DDSpanBuilder withSpanType(final String spanType) {
|
||||
this.spanType = spanType;
|
||||
return this;
|
||||
|
@ -466,7 +481,8 @@ public class DDTracer implements io.opentracing.Tracer {
|
|||
spanType,
|
||||
this.tags,
|
||||
parentTrace,
|
||||
DDTracer.this);
|
||||
DDTracer.this,
|
||||
useRefCounting);
|
||||
|
||||
return context;
|
||||
}
|
||||
|
|
|
@ -72,7 +72,8 @@ public class HTTPCodec implements Codec<TextMap> {
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null);
|
||||
null,
|
||||
false);
|
||||
context.lockSamplingPriority();
|
||||
|
||||
log.debug("{} - Parent context extracted", context);
|
||||
|
|
|
@ -0,0 +1,127 @@
|
|||
package datadog.opentracing.scopemanager;
|
||||
|
||||
import datadog.opentracing.DDSpan;
|
||||
import io.opentracing.Scope;
|
||||
import io.opentracing.ScopeManager;
|
||||
import io.opentracing.Span;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class ContextualScopeManager implements ScopeManager {
|
||||
final ThreadLocal<Scope> tlsScope = new ThreadLocal<>();
|
||||
final Set<ScopeContext> scopeContexts = new CopyOnWriteArraySet<>();
|
||||
|
||||
@Override
|
||||
public Scope activate(final Span span, final boolean finishOnClose) {
|
||||
for (final ScopeContext context : scopeContexts) {
|
||||
if (context.inContext()) {
|
||||
return context.activate(span, finishOnClose);
|
||||
}
|
||||
}
|
||||
if (span instanceof DDSpan && ((DDSpan) span).context().useRefCounting) {
|
||||
return new RefCountingScope(this, new AtomicInteger(1), span);
|
||||
} else {
|
||||
return new ThreadLocalScope(this, span, finishOnClose);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Scope active() {
|
||||
for (final ScopeContext csm : scopeContexts) {
|
||||
if (csm.inContext()) {
|
||||
return csm.active();
|
||||
}
|
||||
}
|
||||
return tlsScope.get();
|
||||
}
|
||||
|
||||
public void addScopeContext(final ScopeContext context) {
|
||||
scopeContexts.add(context);
|
||||
}
|
||||
|
||||
class ThreadLocalScope implements Scope {
|
||||
private final ContextualScopeManager scopeManager;
|
||||
private final Span wrapped;
|
||||
private final boolean finishOnClose;
|
||||
private final Scope toRestore;
|
||||
|
||||
ThreadLocalScope(
|
||||
final ContextualScopeManager scopeManager,
|
||||
final Span wrapped,
|
||||
final boolean finishOnClose) {
|
||||
this.scopeManager = scopeManager;
|
||||
this.wrapped = wrapped;
|
||||
this.finishOnClose = finishOnClose;
|
||||
this.toRestore = scopeManager.tlsScope.get();
|
||||
scopeManager.tlsScope.set(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (scopeManager.tlsScope.get() != this) {
|
||||
// This shouldn't happen if users call methods in the expected order. Bail out.
|
||||
return;
|
||||
}
|
||||
|
||||
if (finishOnClose) {
|
||||
wrapped.finish();
|
||||
}
|
||||
|
||||
scopeManager.tlsScope.set(toRestore);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Span span() {
|
||||
return wrapped;
|
||||
}
|
||||
}
|
||||
|
||||
public class RefCountingScope implements Scope {
|
||||
final ContextualScopeManager manager;
|
||||
final AtomicInteger refCount;
|
||||
private final Span wrapped;
|
||||
private final Scope toRestore;
|
||||
|
||||
RefCountingScope(
|
||||
final ContextualScopeManager manager, final AtomicInteger refCount, final Span wrapped) {
|
||||
this.manager = manager;
|
||||
this.refCount = refCount;
|
||||
this.wrapped = wrapped;
|
||||
this.toRestore = manager.tlsScope.get();
|
||||
manager.tlsScope.set(this);
|
||||
}
|
||||
|
||||
public class Continuation {
|
||||
public Continuation() {
|
||||
refCount.incrementAndGet();
|
||||
}
|
||||
|
||||
public RefCountingScope activate() {
|
||||
return new RefCountingScope(manager, refCount, wrapped);
|
||||
}
|
||||
}
|
||||
|
||||
public RefCountingScope.Continuation capture() {
|
||||
return new RefCountingScope.Continuation();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (manager.tlsScope.get() != this) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (refCount.decrementAndGet() == 0) {
|
||||
wrapped.finish();
|
||||
}
|
||||
|
||||
manager.tlsScope.set(toRestore);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Span span() {
|
||||
return wrapped;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
package datadog.opentracing.scopemanager;
|
||||
|
||||
import io.opentracing.ScopeManager;
|
||||
|
||||
/** Represents a ScopeManager that is only valid in certain cases such as on a specific thread. */
|
||||
public interface ScopeContext extends ScopeManager {
|
||||
|
||||
/**
|
||||
* When multiple ScopeContexts are active, the first one to respond true will have control.
|
||||
*
|
||||
* @return true if this ScopeContext should be active
|
||||
*/
|
||||
boolean inContext();
|
||||
}
|
|
@ -49,7 +49,8 @@ class DDSpanSerializationTest extends Specification {
|
|||
"type",
|
||||
tags,
|
||||
null,
|
||||
null)
|
||||
null,
|
||||
false)
|
||||
|
||||
baggage.put(DDTags.THREAD_NAME, Thread.currentThread().getName())
|
||||
baggage.put(DDTags.THREAD_ID, String.valueOf(Thread.currentThread().getId()))
|
||||
|
|
|
@ -96,7 +96,8 @@ class URLAsResourceNameTest extends Specification {
|
|||
"fakeType",
|
||||
tags,
|
||||
null,
|
||||
null)
|
||||
null,
|
||||
false)
|
||||
|
||||
then:
|
||||
decorator.afterSetTag(context, Tags.HTTP_URL.getKey(), value)
|
||||
|
|
|
@ -25,24 +25,25 @@ class HTTPCodecTest extends Specification {
|
|||
setup:
|
||||
final DDSpanContext mockedContext =
|
||||
new DDSpanContext(
|
||||
1L,
|
||||
2L,
|
||||
0L,
|
||||
"fakeService",
|
||||
"fakeOperation",
|
||||
"fakeResource",
|
||||
samplingPriority,
|
||||
new HashMap<String, String>() {
|
||||
{
|
||||
put("k1", "v1")
|
||||
put("k2", "v2")
|
||||
}
|
||||
},
|
||||
false,
|
||||
"fakeType",
|
||||
null,
|
||||
null,
|
||||
null)
|
||||
1L,
|
||||
2L,
|
||||
0L,
|
||||
"fakeService",
|
||||
"fakeOperation",
|
||||
"fakeResource",
|
||||
samplingPriority,
|
||||
new HashMap<String, String>() {
|
||||
{
|
||||
put("k1", "v1")
|
||||
put("k2", "v2")
|
||||
}
|
||||
},
|
||||
false,
|
||||
"fakeType",
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
false)
|
||||
|
||||
final Map<String, String> carrier = new HashMap<>()
|
||||
|
||||
|
|
|
@ -0,0 +1,215 @@
|
|||
package datadog.opentracing.scopemanager
|
||||
|
||||
import datadog.opentracing.DDSpan
|
||||
import datadog.opentracing.DDTracer
|
||||
import datadog.trace.common.writer.ListWriter
|
||||
import io.opentracing.Scope
|
||||
import io.opentracing.Span
|
||||
import spock.lang.Specification
|
||||
import spock.lang.Subject
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
class ScopeManagerTest extends Specification {
|
||||
def writer = new ListWriter()
|
||||
def tracer = new DDTracer(writer)
|
||||
|
||||
@Subject
|
||||
def scopeManager = tracer.scopeManager()
|
||||
|
||||
def cleanup() {
|
||||
scopeManager.tlsScope.remove()
|
||||
}
|
||||
|
||||
def "threadlocal is empty"() {
|
||||
setup:
|
||||
def builder = tracer.buildSpan("test")
|
||||
builder.start()
|
||||
|
||||
expect:
|
||||
scopeManager.active() == null
|
||||
writer.empty
|
||||
}
|
||||
|
||||
def "threadlocal is active"() {
|
||||
when:
|
||||
def builder = tracer.buildSpan("test")
|
||||
builder.withTag("dd.use.ref.counting", false)
|
||||
def scope = builder.startActive(true)
|
||||
|
||||
then:
|
||||
!spanReported(scope.span())
|
||||
scopeManager.active() == scope
|
||||
scope instanceof ContextualScopeManager.ThreadLocalScope
|
||||
writer.empty
|
||||
|
||||
when:
|
||||
scope.close()
|
||||
|
||||
then:
|
||||
spanReported(scope.span())
|
||||
writer == [[scope.span()]]
|
||||
scopeManager.active() == null
|
||||
}
|
||||
|
||||
def "threadlocal is active with ref counting scope"() {
|
||||
setup:
|
||||
def builder = tracer.buildSpan("test")
|
||||
builder.withReferenceCounting()
|
||||
def scope = builder.startActive(true)
|
||||
|
||||
expect:
|
||||
!spanReported(scope.span())
|
||||
scopeManager.active() == scope
|
||||
scope instanceof ContextualScopeManager.RefCountingScope
|
||||
|
||||
when:
|
||||
scope.close()
|
||||
|
||||
then:
|
||||
spanReported(scope.span())
|
||||
writer == [[scope.span()]]
|
||||
scopeManager.active() == null
|
||||
}
|
||||
|
||||
def "threadlocal is active with ref counting scope using tag"() {
|
||||
setup:
|
||||
def builder = tracer.buildSpan("test")
|
||||
builder.withTag("dd.use.ref.counting", true)
|
||||
def scope = builder.startActive(true)
|
||||
|
||||
expect:
|
||||
!spanReported(scope.span())
|
||||
scopeManager.active() == scope
|
||||
scope instanceof ContextualScopeManager.RefCountingScope
|
||||
|
||||
when:
|
||||
scope.close()
|
||||
|
||||
then:
|
||||
spanReported(scope.span())
|
||||
writer == [[scope.span()]]
|
||||
scopeManager.active() == null
|
||||
}
|
||||
|
||||
def "ref counting scope doesn't close if non-zero"() {
|
||||
setup:
|
||||
def builder = tracer.buildSpan("test")
|
||||
builder.withReferenceCounting()
|
||||
def scope = (ContextualScopeManager.RefCountingScope) builder.startActive(true)
|
||||
def continuation = scope.capture()
|
||||
|
||||
expect:
|
||||
!spanReported(scope.span())
|
||||
scopeManager.active() == scope
|
||||
scope instanceof ContextualScopeManager.RefCountingScope
|
||||
writer.empty
|
||||
|
||||
|
||||
when:
|
||||
scope.close()
|
||||
|
||||
then:
|
||||
!spanReported(scope.span())
|
||||
scopeManager.active() == null
|
||||
writer.empty
|
||||
|
||||
when:
|
||||
continuation.activate()
|
||||
|
||||
then:
|
||||
scopeManager.active() != null
|
||||
|
||||
when:
|
||||
scopeManager.active().close()
|
||||
|
||||
then:
|
||||
scopeManager.active() == null
|
||||
spanReported(scope.span())
|
||||
writer == [[scope.span()]]
|
||||
}
|
||||
|
||||
def "context takes control"() {
|
||||
setup:
|
||||
contexts.each {
|
||||
scopeManager.addScopeContext(it)
|
||||
}
|
||||
def builder = tracer.buildSpan("test")
|
||||
def scope = (AtomicReferenceScope) builder.startActive(true)
|
||||
|
||||
expect:
|
||||
scopeManager.tlsScope.get() == null
|
||||
scopeManager.active() == scope
|
||||
contexts[activeIndex].get() == scope.get()
|
||||
writer.empty
|
||||
|
||||
where:
|
||||
activeIndex | contexts
|
||||
0 | [new AtomicReferenceScope(true)]
|
||||
0 | [new AtomicReferenceScope(true), new AtomicReferenceScope(true)]
|
||||
1 | [new AtomicReferenceScope(false), new AtomicReferenceScope(true), new AtomicReferenceScope(true), new AtomicReferenceScope(false)]
|
||||
2 | [new AtomicReferenceScope(false), new AtomicReferenceScope(false), new AtomicReferenceScope(true)]
|
||||
}
|
||||
|
||||
def "disabled context is ignored"() {
|
||||
setup:
|
||||
contexts.each {
|
||||
scopeManager.addScopeContext(it)
|
||||
}
|
||||
def builder = tracer.buildSpan("test")
|
||||
def scope = builder.startActive(true)
|
||||
|
||||
expect:
|
||||
scopeManager.tlsScope.get() == scope
|
||||
scopeManager.active() == scope
|
||||
writer.empty
|
||||
contexts.each {
|
||||
assert it.get() == null
|
||||
} == contexts
|
||||
|
||||
where:
|
||||
contexts | _
|
||||
[] | _
|
||||
[new AtomicReferenceScope(false)] | _
|
||||
[new AtomicReferenceScope(false), new AtomicReferenceScope(false)] | _
|
||||
[new AtomicReferenceScope(false), new AtomicReferenceScope(false), new AtomicReferenceScope(false)] | _
|
||||
}
|
||||
|
||||
boolean spanReported(DDSpan span) {
|
||||
return span.durationNano != 0
|
||||
}
|
||||
|
||||
class AtomicReferenceScope extends AtomicReference<Span> implements ScopeContext, Scope {
|
||||
final boolean enabled
|
||||
|
||||
AtomicReferenceScope(boolean enabled) {
|
||||
this.enabled = enabled
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean inContext() {
|
||||
return enabled
|
||||
}
|
||||
|
||||
@Override
|
||||
void close() {
|
||||
getAndSet(null).finish()
|
||||
}
|
||||
|
||||
@Override
|
||||
Span span() {
|
||||
return get()
|
||||
}
|
||||
|
||||
@Override
|
||||
Scope activate(Span span, boolean finishSpanOnClose) {
|
||||
set(span)
|
||||
return this
|
||||
}
|
||||
|
||||
@Override
|
||||
Scope active() {
|
||||
return get() == null ? null : this
|
||||
}
|
||||
}
|
||||
}
|
|
@ -20,7 +20,8 @@ class SpanFactory {
|
|||
"fakeType",
|
||||
Collections.emptyMap(),
|
||||
null,
|
||||
new DDTracer())
|
||||
new DDTracer(),
|
||||
false)
|
||||
return new DDSpan(timestampMicro, context)
|
||||
}
|
||||
|
||||
|
@ -38,7 +39,8 @@ class SpanFactory {
|
|||
"fakeType",
|
||||
Collections.emptyMap(),
|
||||
null,
|
||||
tracer)
|
||||
tracer,
|
||||
false)
|
||||
return new DDSpan(1, context)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -66,7 +66,8 @@ class RateByServiceSamplerTest extends Specification {
|
|||
"fakeType",
|
||||
Collections.emptyMap(),
|
||||
null,
|
||||
new DDTracer())
|
||||
new DDTracer(),
|
||||
false)
|
||||
context.setTag("env", envName)
|
||||
return new DDSpan(0l, context)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue