Merge pull request #459 from DataDog/ark/gc-span-closing

gc span closing
This commit is contained in:
Andrew Kent 2018-08-27 11:57:37 -07:00 committed by GitHub
commit ab14c85fcf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 32 additions and 7 deletions

View File

@ -297,12 +297,17 @@ public class DDTracer implements io.opentracing.Tracer, Closeable {
} }
} }
} }
traceCount.incrementAndGet(); incrementTraceCount();
if (!writtenTrace.isEmpty() && sampler.sample(writtenTrace.get(0))) { if (!writtenTrace.isEmpty() && sampler.sample(writtenTrace.get(0))) {
writer.write(writtenTrace); writer.write(writtenTrace);
} }
} }
/** Increment the reported trace count, but do not write a trace. */
void incrementTraceCount() {
traceCount.incrementAndGet();
}
@Override @Override
public void close() { public void close() {
PendingTrace.close(); PendingTrace.close();

View File

@ -214,11 +214,20 @@ public class PendingTrace extends ConcurrentLinkedDeque<DDSpan> {
int count = 0; int count = 0;
while ((ref = referenceQueue.poll()) != null) { while ((ref = referenceQueue.poll()) != null) {
weakReferences.remove(ref); weakReferences.remove(ref);
if (isWritten.compareAndSet(false, true)) {
SPAN_CLEANER.pendingTraces.remove(this);
// preserve throughput count.
// Don't report the trace because the data comes from buggy uses of the api and is suspect.
tracer.incrementTraceCount();
}
count++; count++;
expireReference(); expireReference();
} }
if (count > 0) { if (count > 0) {
log.debug("{} unfinished spans garbage collected!", count); log.debug(
"trace {} : {} unfinished spans garbage collected. Trace will not report.",
traceId,
count);
} }
return count > 0; return count > 0;
} }

View File

@ -4,6 +4,7 @@ import datadog.trace.agent.test.TestUtils
import datadog.trace.common.writer.ListWriter import datadog.trace.common.writer.ListWriter
import spock.lang.Specification import spock.lang.Specification
import spock.lang.Subject import spock.lang.Subject
import spock.lang.Timeout
import java.lang.ref.WeakReference import java.lang.ref.WeakReference
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
@ -94,7 +95,8 @@ class PendingTraceTest extends Specification {
traceCount.get() == 1 traceCount.get() == 1
} }
def "trace reported when unfinished child discarded"() { @Timeout(value = 60, unit = TimeUnit.SECONDS)
def "trace does not report when unfinished child discarded"() {
when: when:
def child = tracer.buildSpan("child").asChildOf(rootSpan).start() def child = tracer.buildSpan("child").asChildOf(rootSpan).start()
rootSpan.finish() rootSpan.finish()
@ -109,15 +111,17 @@ class PendingTraceTest extends Specification {
def childRef = new WeakReference<>(child) def childRef = new WeakReference<>(child)
child = null child = null
TestUtils.awaitGC(childRef) TestUtils.awaitGC(childRef)
while (trace.clean()) { while (trace.pendingReferenceCount.get() > 0) {
trace.clean()
} }
then: then:
trace.pendingReferenceCount.get() == 0 trace.pendingReferenceCount.get() == 0
trace.weakReferences.size() == 0 trace.weakReferences.size() == 0
trace.asList() == [rootSpan] trace.asList() == [rootSpan]
writer == [[rootSpan]] writer == []
traceCount.get() == 1 traceCount.get() == 1
!PendingTrace.SPAN_CLEANER.pendingTraces.contains(trace)
} }
def "add unfinished span to trace fails"() { def "add unfinished span to trace fails"() {

View File

@ -10,8 +10,10 @@ import io.opentracing.Span
import io.opentracing.noop.NoopSpan import io.opentracing.noop.NoopSpan
import spock.lang.Specification import spock.lang.Specification
import spock.lang.Subject import spock.lang.Subject
import spock.lang.Timeout
import java.lang.ref.WeakReference import java.lang.ref.WeakReference
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
@ -179,11 +181,13 @@ class ScopeManagerTest extends Specification {
false | true false | true
} }
@Timeout(value = 60, unit = TimeUnit.SECONDS)
def "hard reference on continuation prevents trace from reporting"() { def "hard reference on continuation prevents trace from reporting"() {
setup: setup:
def builder = tracer.buildSpan("test") def builder = tracer.buildSpan("test")
def scope = (ContinuableScope) builder.startActive(false) def scope = (ContinuableScope) builder.startActive(false)
def span = scope.span() def span = scope.span()
def traceCount = ((DDSpan) span).context().tracer.traceCount
scope.setAsyncPropagation(true) scope.setAsyncPropagation(true)
def continuation = scope.capture() def continuation = scope.capture()
scope.close() scope.close()
@ -199,7 +203,9 @@ class ScopeManagerTest extends Specification {
def continuationRef = new WeakReference<>(continuation) def continuationRef = new WeakReference<>(continuation)
continuation = null // Continuation references also hold up traces. continuation = null // Continuation references also hold up traces.
TestUtils.awaitGC(continuationRef) TestUtils.awaitGC(continuationRef)
writer.waitForTraces(1) while (traceCount.get() == 0) {
// wait until trace count increments or timeout expires
}
} }
if (autoClose) { if (autoClose) {
if (continuation != null) { if (continuation != null) {
@ -208,7 +214,8 @@ class ScopeManagerTest extends Specification {
} }
then: then:
writer == [[span]] forceGC ? true : writer == [[span]]
traceCount.get() == 1
where: where:
autoClose | forceGC autoClose | forceGC