Add support for partial trace flush

Add a configuration option and logic to flush purtial traces to the
agent if ongoing trace becomes too large.
This commit is contained in:
Nikolay Martynov 2019-01-16 13:52:11 -05:00
parent 75e0862566
commit 2a7cebc311
5 changed files with 191 additions and 15 deletions

View File

@ -41,6 +41,7 @@ public class Config {
public static final String SPAN_TAGS = "trace.span.tags";
public static final String JMX_TAGS = "trace.jmx.tags";
public static final String HEADER_TAGS = "trace.header.tags";
public static final String PARTIAL_FLUSH_MIN_SPANS = "trace.partial.flush.min.spans";
public static final String RUNTIME_CONTEXT_FIELD_INJECTION =
"trace.runtime.context.field.injection";
public static final String JMX_FETCH_ENABLED = "jmxfetch.enabled";
@ -69,6 +70,7 @@ public class Config {
private static final boolean DEFAULT_PRIORITY_SAMPLING_ENABLED = true;
private static final boolean DEFAULT_TRACE_RESOLVER_ENABLED = true;
private static final int DEFAULT_MAX_TRACE_SIZE_BEFORE_PARTIAL_FLUSH = 0;
private static final boolean DEFAULT_JMX_FETCH_ENABLED = false;
public static final int DEFAULT_JMX_FETCH_STATSD_PORT = 8125;
@ -90,6 +92,7 @@ public class Config {
private final Map<String, String> spanTags;
private final Map<String, String> jmxTags;
@Getter private final Map<String, String> headerTags;
@Getter private final Integer partialFlushMinSpans;
@Getter private final boolean runtimeContextFieldInjection;
@Getter private final boolean jmxFetchEnabled;
@Getter private final List<String> jmxFetchMetricsConfigs;
@ -122,6 +125,10 @@ public class Config {
jmxTags = getMapSettingFromEnvironment(JMX_TAGS, null);
headerTags = getMapSettingFromEnvironment(HEADER_TAGS, null);
partialFlushMinSpans =
getIntegerSettingFromEnvironment(
PARTIAL_FLUSH_MIN_SPANS, DEFAULT_MAX_TRACE_SIZE_BEFORE_PARTIAL_FLUSH);
runtimeContextFieldInjection =
getBooleanSettingFromEnvironment(
RUNTIME_CONTEXT_FIELD_INJECTION, DEFAULT_RUNTIME_CONTEXT_FIELD_INJECTION);
@ -163,6 +170,9 @@ public class Config {
jmxTags = getPropertyMapValue(properties, JMX_TAGS, parent.jmxTags);
headerTags = getPropertyMapValue(properties, HEADER_TAGS, parent.headerTags);
partialFlushMinSpans =
getPropertyIntegerValue(properties, PARTIAL_FLUSH_MIN_SPANS, parent.partialFlushMinSpans);
runtimeContextFieldInjection =
getPropertyBooleanValue(
properties, RUNTIME_CONTEXT_FIELD_INJECTION, parent.runtimeContextFieldInjection);

View File

@ -37,6 +37,7 @@ class ConfigTest extends Specification {
config.mergedSpanTags == [:]
config.mergedJmxTags == [(RUNTIME_ID_TAG): config.getRuntimeId(), (SERVICE): config.serviceName, (LANGUAGE_TAG_KEY): LANGUAGE_TAG_VALUE]
config.headerTags == [:]
config.partialFlushMinSpans == 0
config.runtimeContextFieldInjection == true
config.jmxFetchEnabled == false
config.jmxFetchMetricsConfigs == []
@ -61,6 +62,7 @@ class ConfigTest extends Specification {
System.setProperty(PREFIX + SPAN_TAGS, "c:3")
System.setProperty(PREFIX + JMX_TAGS, "d:4")
System.setProperty(PREFIX + HEADER_TAGS, "e:5")
System.setProperty(PREFIX + PARTIAL_FLUSH_MIN_SPANS, "15")
System.setProperty(PREFIX + RUNTIME_CONTEXT_FIELD_INJECTION, "false")
System.setProperty(PREFIX + JMX_FETCH_ENABLED, "true")
System.setProperty(PREFIX + JMX_FETCH_METRICS_CONFIGS, "/foo.yaml,/bar.yaml")
@ -83,6 +85,7 @@ class ConfigTest extends Specification {
config.mergedSpanTags == [b: "2", c: "3"]
config.mergedJmxTags == [b: "2", d: "4", (RUNTIME_ID_TAG): config.getRuntimeId(), (SERVICE): config.serviceName, (LANGUAGE_TAG_KEY): LANGUAGE_TAG_VALUE]
config.headerTags == [e: "5"]
config.partialFlushMinSpans == 15
config.runtimeContextFieldInjection == false
config.jmxFetchEnabled == true
config.jmxFetchMetricsConfigs == ["/foo.yaml", "/bar.yaml"]
@ -184,6 +187,7 @@ class ConfigTest extends Specification {
properties.setProperty(SPAN_TAGS, "c:3")
properties.setProperty(JMX_TAGS, "d:4")
properties.setProperty(HEADER_TAGS, "e:5")
properties.setProperty(PARTIAL_FLUSH_MIN_SPANS, "15")
properties.setProperty(JMX_FETCH_METRICS_CONFIGS, "/foo.yaml,/bar.yaml")
properties.setProperty(JMX_FETCH_CHECK_PERIOD, "100")
properties.setProperty(JMX_FETCH_REFRESH_BEANS_PERIOD, "200")
@ -204,6 +208,7 @@ class ConfigTest extends Specification {
config.mergedSpanTags == [b: "2", c: "3"]
config.mergedJmxTags == [b: "2", d: "4", (RUNTIME_ID_TAG): config.getRuntimeId(), (SERVICE): config.serviceName, (LANGUAGE_TAG_KEY): LANGUAGE_TAG_VALUE]
config.headerTags == [e: "5"]
config.partialFlushMinSpans == 15
config.jmxFetchMetricsConfigs == ["/foo.yaml", "/bar.yaml"]
config.jmxFetchCheckPeriod == 100
config.jmxFetchRefreshBeansPeriod == 200

View File

@ -40,6 +40,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
/** DDTracer makes it easy to send traces and span to DD using the OpenTracing API. */
@ -61,6 +62,10 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace
private final Map<String, String> defaultSpanTags;
/** A configured mapping of service names to update with new values */
private final Map<String, String> serviceNameMappings;
/** number of spans in a pending trace before they get flushed */
@Getter private final int maxTraceSizeBeforePartialFlush;
/**
* JVM shutdown callback, keeping a reference to it to remove this if DDTracer gets destroyed
* earlier
@ -113,7 +118,8 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace
config.getRuntimeTags(),
config.getMergedSpanTags(),
config.getServiceMapping(),
config.getHeaderTags());
config.getHeaderTags(),
config.getPartialFlushMinSpans());
log.debug("Using config: {}", config);
}
@ -130,7 +136,8 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace
runtimeTags,
Collections.<String, String>emptyMap(),
Collections.<String, String>emptyMap(),
Collections.<String, String>emptyMap());
Collections.<String, String>emptyMap(),
0);
}
public DDTracer(final Writer writer) {
@ -145,10 +152,13 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace
config.getRuntimeTags(),
config.getMergedSpanTags(),
config.getServiceMapping(),
config.getHeaderTags());
config.getHeaderTags(),
config.getPartialFlushMinSpans());
}
/** @Deprecated. Use {@link #DDTracer(String, Writer, Sampler, Map, Map, Map, Map)} instead. */
/**
* @Deprecated. Use {@link #DDTracer(String, Writer, Sampler, Map, Map, Map, Map, int)} instead.
*/
@Deprecated
public DDTracer(
final String serviceName,
@ -165,15 +175,31 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace
customRuntimeTags(runtimeId),
defaultSpanTags,
serviceNameMappings,
taggedHeaders);
taggedHeaders,
defaultMaxTraceSizeBeforePartialFlush());
}
/**
* @Deprecated. Use {@link #DDTracer(String, Writer, Sampler, Map, Map, Map, Map, int)} instead.
*/
@Deprecated
private static Map<String, String> customRuntimeTags(final String runtimeId) {
final Map<String, String> runtimeTags = new HashMap<>();
runtimeTags.putAll(Config.get().getRuntimeTags());
runtimeTags.put(Config.RUNTIME_ID_TAG, runtimeId);
return Collections.unmodifiableMap(runtimeTags);
public DDTracer(
final String serviceName,
final Writer writer,
final Sampler sampler,
final Map<String, String> runtimeTags,
final Map<String, String> defaultSpanTags,
final Map<String, String> serviceNameMappings,
final Map<String, String> taggedHeaders) {
this(
serviceName,
writer,
sampler,
runtimeTags,
defaultSpanTags,
serviceNameMappings,
taggedHeaders,
defaultMaxTraceSizeBeforePartialFlush());
}
public DDTracer(
@ -183,7 +209,8 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace
final Map<String, String> runtimeTags,
final Map<String, String> defaultSpanTags,
final Map<String, String> serviceNameMappings,
final Map<String, String> taggedHeaders) {
final Map<String, String> taggedHeaders,
final int maxTraceSizeBeforePartialFlush) {
assert runtimeTags != null;
assert defaultSpanTags != null;
assert serviceNameMappings != null;
@ -196,6 +223,7 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace
this.defaultSpanTags = defaultSpanTags;
this.runtimeTags = runtimeTags;
this.serviceNameMappings = serviceNameMappings;
this.maxTraceSizeBeforePartialFlush = maxTraceSizeBeforePartialFlush;
shutdownCallback =
new Thread() {
@ -330,7 +358,7 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace
*
* @param trace a list of the spans related to the same trace
*/
void write(final PendingTrace trace) {
void write(final Collection<DDSpan> trace) {
if (trace.isEmpty()) {
return;
}
@ -350,6 +378,8 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace
}
}
incrementTraceCount();
// TODO: current trace implementation doesn't guarantee that first span is the root span
// We may want to reconsider way this check is done.
if (!writtenTrace.isEmpty() && sampler.sample(writtenTrace.get(0))) {
writer.write(writtenTrace);
}
@ -384,8 +414,8 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace
}
@Override
public void addScopeListener(ScopeListener listener) {
this.scopeManager.addScopeListener(listener);
public void addScopeListener(final ScopeListener listener) {
scopeManager.addScopeListener(listener);
}
@Override
@ -409,6 +439,19 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace
+ '}';
}
@Deprecated
private static Map<String, String> customRuntimeTags(final String runtimeId) {
final Map<String, String> runtimeTags = new HashMap<>();
runtimeTags.putAll(Config.get().getRuntimeTags());
runtimeTags.put(Config.RUNTIME_ID_TAG, runtimeId);
return Collections.unmodifiableMap(runtimeTags);
}
@Deprecated
private static int defaultMaxTraceSizeBeforePartialFlush() {
return Config.get().getPartialFlushMinSpans();
}
private static class CodecRegistry {
private final Map<Format<?>, Codec<?>> codecs = new HashMap<>();

View File

@ -6,7 +6,10 @@ import java.io.Closeable;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -195,11 +198,31 @@ public class PendingTrace extends ConcurrentLinkedDeque<DDSpan> {
final int count = pendingReferenceCount.decrementAndGet();
if (count == 0) {
write();
} else {
if (tracer.getMaxTraceSizeBeforePartialFlush() > 0
&& size() > tracer.getMaxTraceSizeBeforePartialFlush()) {
synchronized (this) {
if (size() > tracer.getMaxTraceSizeBeforePartialFlush()) {
final DDSpan rootSpan = getRootSpan();
final List<DDSpan> partialTrace = new ArrayList(size());
final Iterator<DDSpan> it = iterator();
while (it.hasNext()) {
final DDSpan span = it.next();
if (span != rootSpan) {
partialTrace.add(span);
it.remove();
}
}
log.debug("Writing partial trace {} of size {}", traceId, partialTrace.size());
tracer.write(partialTrace);
}
}
}
}
log.debug("traceId: {} -- Expired reference. count = {}", traceId, count);
}
private void write() {
private synchronized void write() {
if (isWritten.compareAndSet(false, true)) {
SPAN_CLEANER.pendingTraces.remove(this);
if (!isEmpty()) {

View File

@ -1,6 +1,7 @@
package datadog.opentracing
import datadog.trace.agent.test.TestUtils
import datadog.trace.api.Config
import datadog.trace.common.writer.ListWriter
import spock.lang.Specification
import spock.lang.Subject
@ -9,6 +10,8 @@ import spock.lang.Timeout
import java.lang.ref.WeakReference
import java.util.concurrent.TimeUnit
import static datadog.trace.api.Config.PARTIAL_FLUSH_MIN_SPANS
class PendingTraceTest extends Specification {
def writer = new ListWriter()
def tracer = new DDTracer(writer)
@ -179,4 +182,96 @@ class PendingTraceTest extends Specification {
// Generous 5 seconds to execute this test
Math.abs(TimeUnit.NANOSECONDS.toSeconds(trace.currentTimeNano) - TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())) < 5
}
def "partial flush"() {
when:
def properties = new Properties()
properties.setProperty(PARTIAL_FLUSH_MIN_SPANS, "1")
def config = Config.get(properties)
def tracer = new DDTracer(config, writer)
def trace = new PendingTrace(tracer, traceIdStr, [:])
def rootSpan = SpanFactory.newSpanOf(trace)
def child1 = tracer.buildSpan("child1").asChildOf(rootSpan).start()
def child2 = tracer.buildSpan("child2").asChildOf(rootSpan).start()
then:
trace.pendingReferenceCount.get() == 3
trace.weakReferences.size() == 3
when:
rootSpan.finish()
then:
trace.pendingReferenceCount.get() == 2
trace.weakReferences.size() == 2
trace.asList() == [rootSpan]
writer == []
tracer.traceCount.get() == 0
when:
child1.finish()
then:
trace.pendingReferenceCount.get() == 1
trace.weakReferences.size() == 1
trace.asList() == [rootSpan]
writer == [[child1]]
tracer.traceCount.get() == 1
when:
child2.finish()
then:
trace.pendingReferenceCount.get() == 0
trace.weakReferences.size() == 0
trace.asList() == [child2, rootSpan]
writer == [[child1], [child2, rootSpan]]
tracer.traceCount.get() == 2
}
def "partial flush with root span closed last"() {
when:
def properties = new Properties()
properties.setProperty(PARTIAL_FLUSH_MIN_SPANS, "1")
def config = Config.get(properties)
def tracer = new DDTracer(config, writer)
def trace = new PendingTrace(tracer, traceIdStr, [:])
def rootSpan = SpanFactory.newSpanOf(trace)
def child1 = tracer.buildSpan("child1").asChildOf(rootSpan).start()
def child2 = tracer.buildSpan("child2").asChildOf(rootSpan).start()
then:
trace.pendingReferenceCount.get() == 3
trace.weakReferences.size() == 3
when:
child1.finish()
then:
trace.pendingReferenceCount.get() == 2
trace.weakReferences.size() == 2
trace.asList() == [child1]
writer == []
tracer.traceCount.get() == 0
when:
child2.finish()
then:
trace.pendingReferenceCount.get() == 1
trace.weakReferences.size() == 1
trace.asList() == []
writer == [[child2, child1]]
tracer.traceCount.get() == 1
when:
rootSpan.finish()
then:
trace.pendingReferenceCount.get() == 0
trace.weakReferences.size() == 0
trace.asList() == [rootSpan]
writer == [[child2, child1], [rootSpan]]
tracer.traceCount.get() == 2
}
}