diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/DDSpanContext.java b/dd-trace-ot/src/main/java/datadog/opentracing/DDSpanContext.java index eb37968aa1..7262c667ca 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/DDSpanContext.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/DDSpanContext.java @@ -58,6 +58,8 @@ public class DDSpanContext implements io.opentracing.SpanContext { /** Tags are associated to the current span, they will not propagate to the children span */ private Map tags; + @JsonIgnore public final boolean useRefCounting; + public DDSpanContext( final long traceId, final long spanId, @@ -71,7 +73,8 @@ public class DDSpanContext implements io.opentracing.SpanContext { final String spanType, final Map tags, final Queue trace, - final DDTracer tracer) { + final DDTracer tracer, + final boolean useRefCounting) { this.traceId = traceId; this.spanId = spanId; @@ -92,6 +95,8 @@ public class DDSpanContext implements io.opentracing.SpanContext { this.tags = tags; + this.useRefCounting = useRefCounting; + if (trace == null) { // TODO: figure out better concurrency model. this.trace = new ConcurrentLinkedQueue<>(); diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java b/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java index f52f0fb522..0277ec96b0 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java @@ -6,6 +6,8 @@ import datadog.opentracing.decorators.AbstractDecorator; import datadog.opentracing.decorators.DDDecoratorsFactory; import datadog.opentracing.propagation.Codec; import datadog.opentracing.propagation.HTTPCodec; +import datadog.opentracing.scopemanager.ContextualScopeManager; +import datadog.opentracing.scopemanager.ScopeContext; import datadog.trace.api.DDTags; import datadog.trace.common.DDTraceConfig; import datadog.trace.common.Service; @@ -21,7 +23,6 @@ import io.opentracing.ScopeManager; import io.opentracing.Span; import io.opentracing.SpanContext; import io.opentracing.propagation.Format; -import io.opentracing.util.ThreadLocalScopeManager; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -29,6 +30,9 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ThreadLocalRandom; import lombok.extern.slf4j.Slf4j; @@ -44,16 +48,17 @@ public class DDTracer implements io.opentracing.Tracer { final Writer writer; /** Sampler defines the sampling policy in order to reduce the number of traces for instance */ final Sampler sampler; - /** - * Scope manager is in charge of managing the scopes from which spans are created - */ - final ScopeManager scopeManager; + /** Scope manager is in charge of managing the scopes from which spans are created */ + final ContextualScopeManager scopeManager = new ContextualScopeManager(); /** A set of tags that are added to every span */ private final Map spanTags; /** Span context decorators */ - private final Map> spanContextDecorators = new HashMap<>(); + private final Map> spanContextDecorators = + new ConcurrentHashMap<>(); + + private final Set contextualScopeManagers = new ConcurrentSkipListSet<>(); private final CodecRegistry registry; private final Map services = new HashMap<>(); @@ -72,8 +77,7 @@ public class DDTracer implements io.opentracing.Tracer { config.getProperty(DDTraceConfig.SERVICE_NAME), Writer.Builder.forConfig(config), Sampler.Builder.forConfig(config), - new ThreadLocalScopeManager(), - DDTraceConfig.parseMap(config.getProperty(DDTraceConfig.SPAN_TAGS))); + DDTraceConfig.parseMap(config.getProperty(DDTraceConfig.SPAN_TAGS))); log.debug("Using config: {}", config); // Create decorators from resource files @@ -85,20 +89,18 @@ public class DDTracer implements io.opentracing.Tracer { } public DDTracer(final String serviceName, final Writer writer, final Sampler sampler) { - this(serviceName, writer, sampler, new ThreadLocalScopeManager(), Collections.emptyMap()); + this(serviceName, writer, sampler, Collections.emptyMap()); } public DDTracer( final String serviceName, final Writer writer, final Sampler sampler, - final ScopeManager scopeManager, final Map spanTags) { this.serviceName = serviceName; this.writer = writer; this.writer.start(); this.sampler = sampler; - this.scopeManager = scopeManager; this.spanTags = spanTags; registry = new CodecRegistry(); @@ -116,8 +118,7 @@ public class DDTracer implements io.opentracing.Tracer { UNASSIGNED_DEFAULT_SERVICE_NAME, writer, new AllSampler(), - new ThreadLocalScopeManager(), - DDTraceConfig.parseMap(new DDTraceConfig().getProperty(DDTraceConfig.SPAN_TAGS))); + DDTraceConfig.parseMap(new DDTraceConfig().getProperty(DDTraceConfig.SPAN_TAGS))); } /** @@ -145,8 +146,12 @@ public class DDTracer implements io.opentracing.Tracer { spanContextDecorators.put(decorator.getMatchingTag(), list); } + public void addScopeContext(final ScopeContext context) { + scopeManager.addScopeContext(context); + } + @Override - public ScopeManager scopeManager() { + public ContextualScopeManager scopeManager() { return scopeManager; } @@ -274,6 +279,7 @@ public class DDTracer implements io.opentracing.Tracer { private boolean errorFlag; private String spanType; private boolean ignoreScope = false; + private boolean useRefCounting = false; public DDSpanBuilder(final String operationName, final ScopeManager scopeManager) { this.operationName = operationName; @@ -335,7 +341,11 @@ public class DDTracer implements io.opentracing.Tracer { @Override public DDSpanBuilder withTag(final String tag, final boolean bool) { - return withTag(tag, (Object) bool); + if (bool && tag.equals("dd.use.ref.counting")) { + return withReferenceCounting(); + } else { + return withTag(tag, (Object) bool); + } } @Override @@ -359,6 +369,11 @@ public class DDTracer implements io.opentracing.Tracer { return this; } + public DDSpanBuilder withReferenceCounting() { + this.useRefCounting = true; + return this; + } + public DDSpanBuilder withSpanType(final String spanType) { this.spanType = spanType; return this; @@ -466,7 +481,8 @@ public class DDTracer implements io.opentracing.Tracer { spanType, this.tags, parentTrace, - DDTracer.this); + DDTracer.this, + useRefCounting); return context; } diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/propagation/HTTPCodec.java b/dd-trace-ot/src/main/java/datadog/opentracing/propagation/HTTPCodec.java index a7a76e92d4..c59d4868b3 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/propagation/HTTPCodec.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/propagation/HTTPCodec.java @@ -72,7 +72,8 @@ public class HTTPCodec implements Codec { null, null, null, - null); + null, + false); context.lockSamplingPriority(); log.debug("{} - Parent context extracted", context); diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContextualScopeManager.java b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContextualScopeManager.java new file mode 100644 index 0000000000..6e4b8bdabf --- /dev/null +++ b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContextualScopeManager.java @@ -0,0 +1,127 @@ +package datadog.opentracing.scopemanager; + +import datadog.opentracing.DDSpan; +import io.opentracing.Scope; +import io.opentracing.ScopeManager; +import io.opentracing.Span; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicInteger; + +public class ContextualScopeManager implements ScopeManager { + final ThreadLocal tlsScope = new ThreadLocal<>(); + final Set scopeContexts = new CopyOnWriteArraySet<>(); + + @Override + public Scope activate(final Span span, final boolean finishOnClose) { + for (final ScopeContext context : scopeContexts) { + if (context.inContext()) { + return context.activate(span, finishOnClose); + } + } + if (span instanceof DDSpan && ((DDSpan) span).context().useRefCounting) { + return new RefCountingScope(this, new AtomicInteger(1), span); + } else { + return new ThreadLocalScope(this, span, finishOnClose); + } + } + + @Override + public Scope active() { + for (final ScopeContext csm : scopeContexts) { + if (csm.inContext()) { + return csm.active(); + } + } + return tlsScope.get(); + } + + public void addScopeContext(final ScopeContext context) { + scopeContexts.add(context); + } + + class ThreadLocalScope implements Scope { + private final ContextualScopeManager scopeManager; + private final Span wrapped; + private final boolean finishOnClose; + private final Scope toRestore; + + ThreadLocalScope( + final ContextualScopeManager scopeManager, + final Span wrapped, + final boolean finishOnClose) { + this.scopeManager = scopeManager; + this.wrapped = wrapped; + this.finishOnClose = finishOnClose; + this.toRestore = scopeManager.tlsScope.get(); + scopeManager.tlsScope.set(this); + } + + @Override + public void close() { + if (scopeManager.tlsScope.get() != this) { + // This shouldn't happen if users call methods in the expected order. Bail out. + return; + } + + if (finishOnClose) { + wrapped.finish(); + } + + scopeManager.tlsScope.set(toRestore); + } + + @Override + public Span span() { + return wrapped; + } + } + + public class RefCountingScope implements Scope { + final ContextualScopeManager manager; + final AtomicInteger refCount; + private final Span wrapped; + private final Scope toRestore; + + RefCountingScope( + final ContextualScopeManager manager, final AtomicInteger refCount, final Span wrapped) { + this.manager = manager; + this.refCount = refCount; + this.wrapped = wrapped; + this.toRestore = manager.tlsScope.get(); + manager.tlsScope.set(this); + } + + public class Continuation { + public Continuation() { + refCount.incrementAndGet(); + } + + public RefCountingScope activate() { + return new RefCountingScope(manager, refCount, wrapped); + } + } + + public RefCountingScope.Continuation capture() { + return new RefCountingScope.Continuation(); + } + + @Override + public void close() { + if (manager.tlsScope.get() != this) { + return; + } + + if (refCount.decrementAndGet() == 0) { + wrapped.finish(); + } + + manager.tlsScope.set(toRestore); + } + + @Override + public Span span() { + return wrapped; + } + } +} diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ScopeContext.java b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ScopeContext.java new file mode 100644 index 0000000000..37786b0b8d --- /dev/null +++ b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ScopeContext.java @@ -0,0 +1,14 @@ +package datadog.opentracing.scopemanager; + +import io.opentracing.ScopeManager; + +/** Represents a ScopeManager that is only valid in certain cases such as on a specific thread. */ +public interface ScopeContext extends ScopeManager { + + /** + * When multiple ScopeContexts are active, the first one to respond true will have control. + * + * @return true if this ScopeContext should be active + */ + boolean inContext(); +} diff --git a/dd-trace-ot/src/test/groovy/datadog/opentracing/DDSpanSerializationTest.groovy b/dd-trace-ot/src/test/groovy/datadog/opentracing/DDSpanSerializationTest.groovy index 38be3314ac..b0df839408 100644 --- a/dd-trace-ot/src/test/groovy/datadog/opentracing/DDSpanSerializationTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/opentracing/DDSpanSerializationTest.groovy @@ -49,7 +49,8 @@ class DDSpanSerializationTest extends Specification { "type", tags, null, - null) + null, + false) baggage.put(DDTags.THREAD_NAME, Thread.currentThread().getName()) baggage.put(DDTags.THREAD_ID, String.valueOf(Thread.currentThread().getId())) diff --git a/dd-trace-ot/src/test/groovy/datadog/opentracing/decorators/URLAsResourceNameTest.groovy b/dd-trace-ot/src/test/groovy/datadog/opentracing/decorators/URLAsResourceNameTest.groovy index 0307d8daec..43aa470e98 100644 --- a/dd-trace-ot/src/test/groovy/datadog/opentracing/decorators/URLAsResourceNameTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/opentracing/decorators/URLAsResourceNameTest.groovy @@ -96,7 +96,8 @@ class URLAsResourceNameTest extends Specification { "fakeType", tags, null, - null) + null, + false) then: decorator.afterSetTag(context, Tags.HTTP_URL.getKey(), value) diff --git a/dd-trace-ot/src/test/groovy/datadog/opentracing/propagation/HTTPCodecTest.groovy b/dd-trace-ot/src/test/groovy/datadog/opentracing/propagation/HTTPCodecTest.groovy index 35fffa230c..a634c3fcd5 100644 --- a/dd-trace-ot/src/test/groovy/datadog/opentracing/propagation/HTTPCodecTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/opentracing/propagation/HTTPCodecTest.groovy @@ -25,24 +25,25 @@ class HTTPCodecTest extends Specification { setup: final DDSpanContext mockedContext = new DDSpanContext( - 1L, - 2L, - 0L, - "fakeService", - "fakeOperation", - "fakeResource", - samplingPriority, - new HashMap() { - { - put("k1", "v1") - put("k2", "v2") - } - }, - false, - "fakeType", - null, - null, - null) + 1L, + 2L, + 0L, + "fakeService", + "fakeOperation", + "fakeResource", + samplingPriority, + new HashMap() { + { + put("k1", "v1") + put("k2", "v2") + } + }, + false, + "fakeType", + null, + null, + null, + false) final Map carrier = new HashMap<>() diff --git a/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy b/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy new file mode 100644 index 0000000000..6efd8d26c8 --- /dev/null +++ b/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy @@ -0,0 +1,215 @@ +package datadog.opentracing.scopemanager + +import datadog.opentracing.DDSpan +import datadog.opentracing.DDTracer +import datadog.trace.common.writer.ListWriter +import io.opentracing.Scope +import io.opentracing.Span +import spock.lang.Specification +import spock.lang.Subject + +import java.util.concurrent.atomic.AtomicReference + +class ScopeManagerTest extends Specification { + def writer = new ListWriter() + def tracer = new DDTracer(writer) + + @Subject + def scopeManager = tracer.scopeManager() + + def cleanup() { + scopeManager.tlsScope.remove() + } + + def "threadlocal is empty"() { + setup: + def builder = tracer.buildSpan("test") + builder.start() + + expect: + scopeManager.active() == null + writer.empty + } + + def "threadlocal is active"() { + when: + def builder = tracer.buildSpan("test") + builder.withTag("dd.use.ref.counting", false) + def scope = builder.startActive(true) + + then: + !spanReported(scope.span()) + scopeManager.active() == scope + scope instanceof ContextualScopeManager.ThreadLocalScope + writer.empty + + when: + scope.close() + + then: + spanReported(scope.span()) + writer == [[scope.span()]] + scopeManager.active() == null + } + + def "threadlocal is active with ref counting scope"() { + setup: + def builder = tracer.buildSpan("test") + builder.withReferenceCounting() + def scope = builder.startActive(true) + + expect: + !spanReported(scope.span()) + scopeManager.active() == scope + scope instanceof ContextualScopeManager.RefCountingScope + + when: + scope.close() + + then: + spanReported(scope.span()) + writer == [[scope.span()]] + scopeManager.active() == null + } + + def "threadlocal is active with ref counting scope using tag"() { + setup: + def builder = tracer.buildSpan("test") + builder.withTag("dd.use.ref.counting", true) + def scope = builder.startActive(true) + + expect: + !spanReported(scope.span()) + scopeManager.active() == scope + scope instanceof ContextualScopeManager.RefCountingScope + + when: + scope.close() + + then: + spanReported(scope.span()) + writer == [[scope.span()]] + scopeManager.active() == null + } + + def "ref counting scope doesn't close if non-zero"() { + setup: + def builder = tracer.buildSpan("test") + builder.withReferenceCounting() + def scope = (ContextualScopeManager.RefCountingScope) builder.startActive(true) + def continuation = scope.capture() + + expect: + !spanReported(scope.span()) + scopeManager.active() == scope + scope instanceof ContextualScopeManager.RefCountingScope + writer.empty + + + when: + scope.close() + + then: + !spanReported(scope.span()) + scopeManager.active() == null + writer.empty + + when: + continuation.activate() + + then: + scopeManager.active() != null + + when: + scopeManager.active().close() + + then: + scopeManager.active() == null + spanReported(scope.span()) + writer == [[scope.span()]] + } + + def "context takes control"() { + setup: + contexts.each { + scopeManager.addScopeContext(it) + } + def builder = tracer.buildSpan("test") + def scope = (AtomicReferenceScope) builder.startActive(true) + + expect: + scopeManager.tlsScope.get() == null + scopeManager.active() == scope + contexts[activeIndex].get() == scope.get() + writer.empty + + where: + activeIndex | contexts + 0 | [new AtomicReferenceScope(true)] + 0 | [new AtomicReferenceScope(true), new AtomicReferenceScope(true)] + 1 | [new AtomicReferenceScope(false), new AtomicReferenceScope(true), new AtomicReferenceScope(true), new AtomicReferenceScope(false)] + 2 | [new AtomicReferenceScope(false), new AtomicReferenceScope(false), new AtomicReferenceScope(true)] + } + + def "disabled context is ignored"() { + setup: + contexts.each { + scopeManager.addScopeContext(it) + } + def builder = tracer.buildSpan("test") + def scope = builder.startActive(true) + + expect: + scopeManager.tlsScope.get() == scope + scopeManager.active() == scope + writer.empty + contexts.each { + assert it.get() == null + } == contexts + + where: + contexts | _ + [] | _ + [new AtomicReferenceScope(false)] | _ + [new AtomicReferenceScope(false), new AtomicReferenceScope(false)] | _ + [new AtomicReferenceScope(false), new AtomicReferenceScope(false), new AtomicReferenceScope(false)] | _ + } + + boolean spanReported(DDSpan span) { + return span.durationNano != 0 + } + + class AtomicReferenceScope extends AtomicReference implements ScopeContext, Scope { + final boolean enabled + + AtomicReferenceScope(boolean enabled) { + this.enabled = enabled + } + + @Override + boolean inContext() { + return enabled + } + + @Override + void close() { + getAndSet(null).finish() + } + + @Override + Span span() { + return get() + } + + @Override + Scope activate(Span span, boolean finishSpanOnClose) { + set(span) + return this + } + + @Override + Scope active() { + return get() == null ? null : this + } + } +} diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/SpanFactory.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/SpanFactory.groovy index 9f901536f7..88cc46aaec 100644 --- a/dd-trace-ot/src/test/groovy/datadog/trace/SpanFactory.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/trace/SpanFactory.groovy @@ -20,7 +20,8 @@ class SpanFactory { "fakeType", Collections.emptyMap(), null, - new DDTracer()) + new DDTracer(), + false) return new DDSpan(timestampMicro, context) } @@ -38,7 +39,8 @@ class SpanFactory { "fakeType", Collections.emptyMap(), null, - tracer) + tracer, + false) return new DDSpan(1, context) } } diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/api/sampling/RateByServiceSamplerTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/api/sampling/RateByServiceSamplerTest.groovy index ef4480d52c..c63ea34886 100644 --- a/dd-trace-ot/src/test/groovy/datadog/trace/api/sampling/RateByServiceSamplerTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/trace/api/sampling/RateByServiceSamplerTest.groovy @@ -66,7 +66,8 @@ class RateByServiceSamplerTest extends Specification { "fakeType", Collections.emptyMap(), null, - new DDTracer()) + new DDTracer(), + false) context.setTag("env", envName) return new DDSpan(0l, context) }