From 2a7cebc31124a0bae41cc2a0afac5465c94818e3 Mon Sep 17 00:00:00 2001 From: Nikolay Martynov Date: Wed, 16 Jan 2019 13:52:11 -0500 Subject: [PATCH] Add support for partial trace flush Add a configuration option and logic to flush purtial traces to the agent if ongoing trace becomes too large. --- .../main/java/datadog/trace/api/Config.java | 10 ++ .../datadog/trace/api/ConfigTest.groovy | 5 + .../java/datadog/opentracing/DDTracer.java | 71 +++++++++++--- .../datadog/opentracing/PendingTrace.java | 25 ++++- .../opentracing/PendingTraceTest.groovy | 95 +++++++++++++++++++ 5 files changed, 191 insertions(+), 15 deletions(-) diff --git a/dd-trace-api/src/main/java/datadog/trace/api/Config.java b/dd-trace-api/src/main/java/datadog/trace/api/Config.java index 1c9a84347e..3f29ca454e 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/Config.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/Config.java @@ -41,6 +41,7 @@ public class Config { public static final String SPAN_TAGS = "trace.span.tags"; public static final String JMX_TAGS = "trace.jmx.tags"; public static final String HEADER_TAGS = "trace.header.tags"; + public static final String PARTIAL_FLUSH_MIN_SPANS = "trace.partial.flush.min.spans"; public static final String RUNTIME_CONTEXT_FIELD_INJECTION = "trace.runtime.context.field.injection"; public static final String JMX_FETCH_ENABLED = "jmxfetch.enabled"; @@ -69,6 +70,7 @@ public class Config { private static final boolean DEFAULT_PRIORITY_SAMPLING_ENABLED = true; private static final boolean DEFAULT_TRACE_RESOLVER_ENABLED = true; + private static final int DEFAULT_MAX_TRACE_SIZE_BEFORE_PARTIAL_FLUSH = 0; private static final boolean DEFAULT_JMX_FETCH_ENABLED = false; public static final int DEFAULT_JMX_FETCH_STATSD_PORT = 8125; @@ -90,6 +92,7 @@ public class Config { private final Map spanTags; private final Map jmxTags; @Getter private final Map headerTags; + @Getter private final Integer partialFlushMinSpans; @Getter private final boolean runtimeContextFieldInjection; @Getter private final boolean jmxFetchEnabled; @Getter private final List jmxFetchMetricsConfigs; @@ -122,6 +125,10 @@ public class Config { jmxTags = getMapSettingFromEnvironment(JMX_TAGS, null); headerTags = getMapSettingFromEnvironment(HEADER_TAGS, null); + partialFlushMinSpans = + getIntegerSettingFromEnvironment( + PARTIAL_FLUSH_MIN_SPANS, DEFAULT_MAX_TRACE_SIZE_BEFORE_PARTIAL_FLUSH); + runtimeContextFieldInjection = getBooleanSettingFromEnvironment( RUNTIME_CONTEXT_FIELD_INJECTION, DEFAULT_RUNTIME_CONTEXT_FIELD_INJECTION); @@ -163,6 +170,9 @@ public class Config { jmxTags = getPropertyMapValue(properties, JMX_TAGS, parent.jmxTags); headerTags = getPropertyMapValue(properties, HEADER_TAGS, parent.headerTags); + partialFlushMinSpans = + getPropertyIntegerValue(properties, PARTIAL_FLUSH_MIN_SPANS, parent.partialFlushMinSpans); + runtimeContextFieldInjection = getPropertyBooleanValue( properties, RUNTIME_CONTEXT_FIELD_INJECTION, parent.runtimeContextFieldInjection); diff --git a/dd-trace-api/src/test/groovy/datadog/trace/api/ConfigTest.groovy b/dd-trace-api/src/test/groovy/datadog/trace/api/ConfigTest.groovy index 02bc41ff13..a1ef086be9 100644 --- a/dd-trace-api/src/test/groovy/datadog/trace/api/ConfigTest.groovy +++ b/dd-trace-api/src/test/groovy/datadog/trace/api/ConfigTest.groovy @@ -37,6 +37,7 @@ class ConfigTest extends Specification { config.mergedSpanTags == [:] config.mergedJmxTags == [(RUNTIME_ID_TAG): config.getRuntimeId(), (SERVICE): config.serviceName, (LANGUAGE_TAG_KEY): LANGUAGE_TAG_VALUE] config.headerTags == [:] + config.partialFlushMinSpans == 0 config.runtimeContextFieldInjection == true config.jmxFetchEnabled == false config.jmxFetchMetricsConfigs == [] @@ -61,6 +62,7 @@ class ConfigTest extends Specification { System.setProperty(PREFIX + SPAN_TAGS, "c:3") System.setProperty(PREFIX + JMX_TAGS, "d:4") System.setProperty(PREFIX + HEADER_TAGS, "e:5") + System.setProperty(PREFIX + PARTIAL_FLUSH_MIN_SPANS, "15") System.setProperty(PREFIX + RUNTIME_CONTEXT_FIELD_INJECTION, "false") System.setProperty(PREFIX + JMX_FETCH_ENABLED, "true") System.setProperty(PREFIX + JMX_FETCH_METRICS_CONFIGS, "/foo.yaml,/bar.yaml") @@ -83,6 +85,7 @@ class ConfigTest extends Specification { config.mergedSpanTags == [b: "2", c: "3"] config.mergedJmxTags == [b: "2", d: "4", (RUNTIME_ID_TAG): config.getRuntimeId(), (SERVICE): config.serviceName, (LANGUAGE_TAG_KEY): LANGUAGE_TAG_VALUE] config.headerTags == [e: "5"] + config.partialFlushMinSpans == 15 config.runtimeContextFieldInjection == false config.jmxFetchEnabled == true config.jmxFetchMetricsConfigs == ["/foo.yaml", "/bar.yaml"] @@ -184,6 +187,7 @@ class ConfigTest extends Specification { properties.setProperty(SPAN_TAGS, "c:3") properties.setProperty(JMX_TAGS, "d:4") properties.setProperty(HEADER_TAGS, "e:5") + properties.setProperty(PARTIAL_FLUSH_MIN_SPANS, "15") properties.setProperty(JMX_FETCH_METRICS_CONFIGS, "/foo.yaml,/bar.yaml") properties.setProperty(JMX_FETCH_CHECK_PERIOD, "100") properties.setProperty(JMX_FETCH_REFRESH_BEANS_PERIOD, "200") @@ -204,6 +208,7 @@ class ConfigTest extends Specification { config.mergedSpanTags == [b: "2", c: "3"] config.mergedJmxTags == [b: "2", d: "4", (RUNTIME_ID_TAG): config.getRuntimeId(), (SERVICE): config.serviceName, (LANGUAGE_TAG_KEY): LANGUAGE_TAG_VALUE] config.headerTags == [e: "5"] + config.partialFlushMinSpans == 15 config.jmxFetchMetricsConfigs == ["/foo.yaml", "/bar.yaml"] config.jmxFetchCheckPeriod == 100 config.jmxFetchRefreshBeansPeriod == 200 diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java b/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java index d3660cc416..8ce78609ab 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java @@ -40,6 +40,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; /** DDTracer makes it easy to send traces and span to DD using the OpenTracing API. */ @@ -61,6 +62,10 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace private final Map defaultSpanTags; /** A configured mapping of service names to update with new values */ private final Map serviceNameMappings; + + /** number of spans in a pending trace before they get flushed */ + @Getter private final int maxTraceSizeBeforePartialFlush; + /** * JVM shutdown callback, keeping a reference to it to remove this if DDTracer gets destroyed * earlier @@ -113,7 +118,8 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace config.getRuntimeTags(), config.getMergedSpanTags(), config.getServiceMapping(), - config.getHeaderTags()); + config.getHeaderTags(), + config.getPartialFlushMinSpans()); log.debug("Using config: {}", config); } @@ -130,7 +136,8 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace runtimeTags, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap()); + Collections.emptyMap(), + 0); } public DDTracer(final Writer writer) { @@ -145,10 +152,13 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace config.getRuntimeTags(), config.getMergedSpanTags(), config.getServiceMapping(), - config.getHeaderTags()); + config.getHeaderTags(), + config.getPartialFlushMinSpans()); } - /** @Deprecated. Use {@link #DDTracer(String, Writer, Sampler, Map, Map, Map, Map)} instead. */ + /** + * @Deprecated. Use {@link #DDTracer(String, Writer, Sampler, Map, Map, Map, Map, int)} instead. + */ @Deprecated public DDTracer( final String serviceName, @@ -165,15 +175,31 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace customRuntimeTags(runtimeId), defaultSpanTags, serviceNameMappings, - taggedHeaders); + taggedHeaders, + defaultMaxTraceSizeBeforePartialFlush()); } + /** + * @Deprecated. Use {@link #DDTracer(String, Writer, Sampler, Map, Map, Map, Map, int)} instead. + */ @Deprecated - private static Map customRuntimeTags(final String runtimeId) { - final Map runtimeTags = new HashMap<>(); - runtimeTags.putAll(Config.get().getRuntimeTags()); - runtimeTags.put(Config.RUNTIME_ID_TAG, runtimeId); - return Collections.unmodifiableMap(runtimeTags); + public DDTracer( + final String serviceName, + final Writer writer, + final Sampler sampler, + final Map runtimeTags, + final Map defaultSpanTags, + final Map serviceNameMappings, + final Map taggedHeaders) { + this( + serviceName, + writer, + sampler, + runtimeTags, + defaultSpanTags, + serviceNameMappings, + taggedHeaders, + defaultMaxTraceSizeBeforePartialFlush()); } public DDTracer( @@ -183,7 +209,8 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace final Map runtimeTags, final Map defaultSpanTags, final Map serviceNameMappings, - final Map taggedHeaders) { + final Map taggedHeaders, + final int maxTraceSizeBeforePartialFlush) { assert runtimeTags != null; assert defaultSpanTags != null; assert serviceNameMappings != null; @@ -196,6 +223,7 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace this.defaultSpanTags = defaultSpanTags; this.runtimeTags = runtimeTags; this.serviceNameMappings = serviceNameMappings; + this.maxTraceSizeBeforePartialFlush = maxTraceSizeBeforePartialFlush; shutdownCallback = new Thread() { @@ -330,7 +358,7 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace * * @param trace a list of the spans related to the same trace */ - void write(final PendingTrace trace) { + void write(final Collection trace) { if (trace.isEmpty()) { return; } @@ -350,6 +378,8 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace } } incrementTraceCount(); + // TODO: current trace implementation doesn't guarantee that first span is the root span + // We may want to reconsider way this check is done. if (!writtenTrace.isEmpty() && sampler.sample(writtenTrace.get(0))) { writer.write(writtenTrace); } @@ -384,8 +414,8 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace } @Override - public void addScopeListener(ScopeListener listener) { - this.scopeManager.addScopeListener(listener); + public void addScopeListener(final ScopeListener listener) { + scopeManager.addScopeListener(listener); } @Override @@ -409,6 +439,19 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace + '}'; } + @Deprecated + private static Map customRuntimeTags(final String runtimeId) { + final Map runtimeTags = new HashMap<>(); + runtimeTags.putAll(Config.get().getRuntimeTags()); + runtimeTags.put(Config.RUNTIME_ID_TAG, runtimeId); + return Collections.unmodifiableMap(runtimeTags); + } + + @Deprecated + private static int defaultMaxTraceSizeBeforePartialFlush() { + return Config.get().getPartialFlushMinSpans(); + } + private static class CodecRegistry { private final Map, Codec> codecs = new HashMap<>(); diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/PendingTrace.java b/dd-trace-ot/src/main/java/datadog/opentracing/PendingTrace.java index 9656dfb78e..b2032445db 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/PendingTrace.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/PendingTrace.java @@ -6,7 +6,10 @@ import java.io.Closeable; import java.lang.ref.Reference; import java.lang.ref.ReferenceQueue; import java.lang.ref.WeakReference; +import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -195,11 +198,31 @@ public class PendingTrace extends ConcurrentLinkedDeque { final int count = pendingReferenceCount.decrementAndGet(); if (count == 0) { write(); + } else { + if (tracer.getMaxTraceSizeBeforePartialFlush() > 0 + && size() > tracer.getMaxTraceSizeBeforePartialFlush()) { + synchronized (this) { + if (size() > tracer.getMaxTraceSizeBeforePartialFlush()) { + final DDSpan rootSpan = getRootSpan(); + final List partialTrace = new ArrayList(size()); + final Iterator it = iterator(); + while (it.hasNext()) { + final DDSpan span = it.next(); + if (span != rootSpan) { + partialTrace.add(span); + it.remove(); + } + } + log.debug("Writing partial trace {} of size {}", traceId, partialTrace.size()); + tracer.write(partialTrace); + } + } + } } log.debug("traceId: {} -- Expired reference. count = {}", traceId, count); } - private void write() { + private synchronized void write() { if (isWritten.compareAndSet(false, true)) { SPAN_CLEANER.pendingTraces.remove(this); if (!isEmpty()) { diff --git a/dd-trace-ot/src/test/groovy/datadog/opentracing/PendingTraceTest.groovy b/dd-trace-ot/src/test/groovy/datadog/opentracing/PendingTraceTest.groovy index 8327221913..3b9e1a347a 100644 --- a/dd-trace-ot/src/test/groovy/datadog/opentracing/PendingTraceTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/opentracing/PendingTraceTest.groovy @@ -1,6 +1,7 @@ package datadog.opentracing import datadog.trace.agent.test.TestUtils +import datadog.trace.api.Config import datadog.trace.common.writer.ListWriter import spock.lang.Specification import spock.lang.Subject @@ -9,6 +10,8 @@ import spock.lang.Timeout import java.lang.ref.WeakReference import java.util.concurrent.TimeUnit +import static datadog.trace.api.Config.PARTIAL_FLUSH_MIN_SPANS + class PendingTraceTest extends Specification { def writer = new ListWriter() def tracer = new DDTracer(writer) @@ -179,4 +182,96 @@ class PendingTraceTest extends Specification { // Generous 5 seconds to execute this test Math.abs(TimeUnit.NANOSECONDS.toSeconds(trace.currentTimeNano) - TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())) < 5 } + + def "partial flush"() { + when: + def properties = new Properties() + properties.setProperty(PARTIAL_FLUSH_MIN_SPANS, "1") + def config = Config.get(properties) + def tracer = new DDTracer(config, writer) + def trace = new PendingTrace(tracer, traceIdStr, [:]) + def rootSpan = SpanFactory.newSpanOf(trace) + def child1 = tracer.buildSpan("child1").asChildOf(rootSpan).start() + def child2 = tracer.buildSpan("child2").asChildOf(rootSpan).start() + + then: + trace.pendingReferenceCount.get() == 3 + trace.weakReferences.size() == 3 + + when: + rootSpan.finish() + + then: + trace.pendingReferenceCount.get() == 2 + trace.weakReferences.size() == 2 + trace.asList() == [rootSpan] + writer == [] + tracer.traceCount.get() == 0 + + when: + child1.finish() + + then: + trace.pendingReferenceCount.get() == 1 + trace.weakReferences.size() == 1 + trace.asList() == [rootSpan] + writer == [[child1]] + tracer.traceCount.get() == 1 + + when: + child2.finish() + + then: + trace.pendingReferenceCount.get() == 0 + trace.weakReferences.size() == 0 + trace.asList() == [child2, rootSpan] + writer == [[child1], [child2, rootSpan]] + tracer.traceCount.get() == 2 + } + + def "partial flush with root span closed last"() { + when: + def properties = new Properties() + properties.setProperty(PARTIAL_FLUSH_MIN_SPANS, "1") + def config = Config.get(properties) + def tracer = new DDTracer(config, writer) + def trace = new PendingTrace(tracer, traceIdStr, [:]) + def rootSpan = SpanFactory.newSpanOf(trace) + def child1 = tracer.buildSpan("child1").asChildOf(rootSpan).start() + def child2 = tracer.buildSpan("child2").asChildOf(rootSpan).start() + + then: + trace.pendingReferenceCount.get() == 3 + trace.weakReferences.size() == 3 + + when: + child1.finish() + + then: + trace.pendingReferenceCount.get() == 2 + trace.weakReferences.size() == 2 + trace.asList() == [child1] + writer == [] + tracer.traceCount.get() == 0 + + when: + child2.finish() + + then: + trace.pendingReferenceCount.get() == 1 + trace.weakReferences.size() == 1 + trace.asList() == [] + writer == [[child2, child1]] + tracer.traceCount.get() == 1 + + when: + rootSpan.finish() + + then: + trace.pendingReferenceCount.get() == 0 + trace.weakReferences.size() == 0 + trace.asList() == [rootSpan] + writer == [[child2, child1], [rootSpan]] + tracer.traceCount.get() == 2 + } }