Add shutdown hook to span-cleaner thread

This commit is contained in:
Tyler Benson 2018-05-11 13:54:00 +10:00
parent 632d9b8da4
commit 97b3e537bd
2 changed files with 45 additions and 11 deletions

View File

@ -114,6 +114,15 @@ public class DDTracer implements io.opentracing.Tracer {
this.sampler = sampler;
this.spanTags = defaultSpanTags;
Runtime.getRuntime()
.addShutdownHook(
new Thread() {
@Override
public void run() {
DDTracer.this.close();
}
});
registry = new CodecRegistry();
registry.register(Format.Builtin.HTTP_HEADERS, new HTTPCodec());
registry.register(Format.Builtin.TEXT_MAP, new HTTPCodec());
@ -271,6 +280,7 @@ public class DDTracer implements io.opentracing.Tracer {
}
public void close() {
PendingTrace.close();
writer.close();
}

View File

@ -1,6 +1,7 @@
package datadog.opentracing;
import datadog.opentracing.scopemanager.ContinuableScope;
import java.io.Closeable;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
@ -18,8 +19,11 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class PendingTrace extends ConcurrentLinkedDeque<DDSpan> {
private static final SpanCleaner SPAN_CLEANER;
static {
SpanCleaner.start();
SPAN_CLEANER = new SpanCleaner();
SPAN_CLEANER.start();
}
private final DDTracer tracer;
@ -37,7 +41,7 @@ public class PendingTrace extends ConcurrentLinkedDeque<DDSpan> {
PendingTrace(final DDTracer tracer, final long traceId) {
this.tracer = tracer;
this.traceId = traceId;
SpanCleaner.pendingTraces.add(this);
SPAN_CLEANER.pendingTraces.add(this);
}
public void registerSpan(final DDSpan span) {
@ -134,7 +138,7 @@ public class PendingTrace extends ConcurrentLinkedDeque<DDSpan> {
private void write() {
if (isWritten.compareAndSet(false, true)) {
SpanCleaner.pendingTraces.remove(this);
SPAN_CLEANER.pendingTraces.remove(this);
if (!isEmpty()) {
log.debug("Writing {} spans to {}.", this.size(), tracer.writer);
tracer.write(this);
@ -170,26 +174,36 @@ public class PendingTrace extends ConcurrentLinkedDeque<DDSpan> {
}
}
private static class SpanCleaner implements Runnable {
static void close() {
SPAN_CLEANER.close();
}
private static class SpanCleaner implements Runnable, Closeable {
private static final long CLEAN_FREQUENCY = 1;
private static final ThreadFactory FACTORY =
new ThreadFactory() {
@Override
public Thread newThread(final Runnable r) {
final Thread thread = new Thread(r, "dd-span-cleaner");
thread.setDaemon(true);
return thread;
return new Thread(r, "dd-span-cleaner");
}
};
private static final ScheduledExecutorService EXECUTOR_SERVICE =
private final ScheduledExecutorService executorService =
Executors.newScheduledThreadPool(1, FACTORY);
static final Set<PendingTrace> pendingTraces =
private final Set<PendingTrace> pendingTraces =
Collections.newSetFromMap(new ConcurrentHashMap<PendingTrace, Boolean>());
static void start() {
EXECUTOR_SERVICE.scheduleAtFixedRate(new SpanCleaner(), 0, CLEAN_FREQUENCY, TimeUnit.SECONDS);
void start() {
executorService.scheduleAtFixedRate(new SpanCleaner(), 0, CLEAN_FREQUENCY, TimeUnit.SECONDS);
Runtime.getRuntime()
.addShutdownHook(
new Thread() {
@Override
public void run() {
PendingTrace.SpanCleaner.this.close();
}
});
}
@Override
@ -198,5 +212,15 @@ public class PendingTrace extends ConcurrentLinkedDeque<DDSpan> {
trace.clean();
}
}
@Override
public void close() {
executorService.shutdownNow();
try {
executorService.awaitTermination(500, TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) {
log.info("Writer properly closed and async writer interrupted.");
}
}
}
}