Make `CommonTaskExecutor` periodic tasks safe

* Verify that we can schedule task and catch exceptions.
This should help to avoid additional exceptions on app crash during
  startup.

* Avoid holding strong references from within executor to make sure
  that things can get GCed.
This commit is contained in:
Nikolay Martynov 2020-04-13 15:57:01 -04:00
parent af188c2901
commit 1fb844ab5f
4 changed files with 174 additions and 55 deletions

View File

@ -1,9 +1,7 @@
package datadog.trace.agent.tooling;
import datadog.common.exec.CommonTaskExecutor;
import java.lang.ref.WeakReference;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import datadog.common.exec.CommonTaskExecutor.Task;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
@ -12,47 +10,25 @@ class Cleaner {
<T> void scheduleCleaning(
final T target, final Adapter<T> adapter, final long frequency, final TimeUnit unit) {
final CleanupRunnable<T> command = new CleanupRunnable<>(target, adapter);
if (CommonTaskExecutor.INSTANCE.isShutdown()) {
log.warn(
"Cleaning scheduled but task scheduler is shutdown. Target won't be cleaned {}", target);
} else {
try {
// Schedule job and save future to allow job to be canceled if target is GC'd.
command.setFuture(
CommonTaskExecutor.INSTANCE.scheduleAtFixedRate(command, frequency, frequency, unit));
} catch (final RejectedExecutionException e) {
log.warn("Cleaning task rejected. Target won't be cleaned {}", target);
}
CommonTaskExecutor.INSTANCE.scheduleAtFixedRate(
new CleaningTask(adapter), target, frequency, frequency, unit, "cleaner for " + target);
}
// Important to use explicit class to avoid implicit hard references to target
private static class CleaningTask<T> implements Task<T> {
private final Adapter<T> adapter;
public CleaningTask(final Adapter<T> adapter) {
this.adapter = adapter;
}
@Override
public void run(final T target) {
adapter.clean(target);
}
}
public interface Adapter<T> {
void clean(T target);
}
private static class CleanupRunnable<T> implements Runnable {
private final WeakReference<T> target;
private final Adapter<T> adapter;
private volatile ScheduledFuture<?> future = null;
private CleanupRunnable(final T target, final Adapter<T> adapter) {
this.target = new WeakReference<>(target);
this.adapter = adapter;
}
@Override
public void run() {
final T t = target.get();
if (t != null) {
adapter.clean(t);
} else if (future != null) {
future.cancel(false);
}
}
public void setFuture(final ScheduledFuture<?> future) {
this.future = future;
}
}
}

View File

@ -1,6 +1,7 @@
package datadog.opentracing;
import datadog.common.exec.CommonTaskExecutor;
import datadog.common.exec.CommonTaskExecutor.Task;
import datadog.opentracing.scopemanager.ContinuableScope;
import datadog.trace.common.util.Clock;
import java.io.Closeable;
@ -296,7 +297,13 @@ public class PendingTrace extends ConcurrentLinkedDeque<DDSpan> {
Collections.newSetFromMap(new ConcurrentHashMap<PendingTrace, Boolean>());
public SpanCleaner() {
CommonTaskExecutor.INSTANCE.scheduleAtFixedRate(this, 0, CLEAN_FREQUENCY, TimeUnit.SECONDS);
CommonTaskExecutor.INSTANCE.scheduleAtFixedRate(
SpanCleanerTask.INSTANCE,
this,
0,
CLEAN_FREQUENCY,
TimeUnit.SECONDS,
"Pending trace cleaner");
}
@Override
@ -312,4 +319,17 @@ public class PendingTrace extends ConcurrentLinkedDeque<DDSpan> {
run();
}
}
/*
* Important to use explicit class to avoid implicit hard references to cleaners from within executor.
*/
private static class SpanCleanerTask implements Task<SpanCleaner> {
static final SpanCleanerTask INSTANCE = new SpanCleanerTask();
@Override
public void run(final SpanCleaner target) {
target.run();
}
}
}

View File

@ -2,6 +2,7 @@ package datadog.trace.common.writer.ddagent;
import com.lmax.disruptor.EventHandler;
import datadog.common.exec.CommonTaskExecutor;
import datadog.common.exec.CommonTaskExecutor.Task;
import datadog.common.exec.DaemonThreadFactory;
import datadog.trace.common.writer.DDAgentWriter;
import java.util.ArrayList;
@ -32,17 +33,8 @@ public class BatchWritingDisruptor extends AbstractDisruptor<byte[]> {
if (0 < flushFrequencySeconds) {
// This provides a steady stream of events to enable flushing with a low throughput.
final Runnable heartbeat =
new Runnable() {
@Override
public void run() {
// Only add if the buffer is empty.
if (running && getCurrentCount() == 0) {
disruptor.getRingBuffer().tryPublishEvent(heartbeatTranslator);
}
}
};
CommonTaskExecutor.INSTANCE.scheduleAtFixedRate(heartbeat, 100, 100, TimeUnit.MILLISECONDS);
CommonTaskExecutor.INSTANCE.scheduleAtFixedRate(
new HeartbeatTask(), this, 100, 100, TimeUnit.MILLISECONDS, "disruptor heartbeat");
}
}
@ -58,6 +50,12 @@ public class BatchWritingDisruptor extends AbstractDisruptor<byte[]> {
return true;
}
private void heartbeat() {
if (running && getCurrentCount() == 0) {
disruptor.getRingBuffer().tryPublishEvent(heartbeatTranslator);
}
}
// Intentionally not thread safe.
private static class BatchWritingHandler implements EventHandler<DisruptorEvent<byte[]>> {
@ -162,4 +160,12 @@ public class BatchWritingDisruptor extends AbstractDisruptor<byte[]> {
}
}
}
// Important to use explicit class to avoid implicit hard references to BatchWritingDisruptor
private static final class HeartbeatTask implements Task<BatchWritingDisruptor> {
@Override
public void run(final BatchWritingDisruptor target) {
target.heartbeat();
}
}
}

View File

@ -1,8 +1,11 @@
package datadog.common.exec;
import java.lang.ref.WeakReference;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Delayed;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -25,9 +28,48 @@ public final class CommonTaskExecutor extends AbstractExecutorService {
}
}
public ScheduledFuture<?> scheduleAtFixedRate(
final Runnable command, final long initialDelay, final long period, final TimeUnit unit) {
return executorService.scheduleAtFixedRate(command, initialDelay, period, unit);
/**
* Run {@code task} periodically providing it with {@code target}
*
* <p>Important implementation detail here is that internally we do not hold any strong references
* to {@code target} which means it can be GCed even while periodic task is still scheduled.
*
* <p>If {@code target} is GCed periodic task is canceled.
*
* @param task task to run. Important: must not hold any strong references to target (or anything
* else non static)
* @param target target object to pass to task
* @param initialDelay initialDelay, see {@link
* ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}
* @param period period, see {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long,
* long, TimeUnit)}
* @param unit unit, see {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long,
* TimeUnit)}
* @param name name to use in logs when task cannot be scheduled
* @return future that can be canceled
*/
public <T> ScheduledFuture<?> scheduleAtFixedRate(
final Task<T> task,
final T target,
final long initialDelay,
final long period,
final TimeUnit unit,
final String name) {
if (CommonTaskExecutor.INSTANCE.isShutdown()) {
log.warn("Periodic task scheduler is shutdown. Will not run: {}", name);
} else {
try {
final PeriodicTask<T> periodicTask = new PeriodicTask<>(task, target);
final ScheduledFuture<?> future =
executorService.scheduleAtFixedRate(
new PeriodicTask<>(task, target), initialDelay, period, unit);
periodicTask.setFuture(future);
return future;
} catch (final RejectedExecutionException e) {
log.warn("Cleaning task rejected. Will not run: {}", name);
}
}
return new UnscheduledFuture(name);
}
@Override
@ -82,4 +124,79 @@ public final class CommonTaskExecutor extends AbstractExecutorService {
}
}
}
public interface Task<T> {
void run(T target);
}
public static class PeriodicTask<T> implements Runnable {
private final WeakReference<T> target;
private final Task<T> task;
private volatile ScheduledFuture<?> future = null;
private PeriodicTask(final Task<T> task, final T target) {
this.target = new WeakReference<>(target);
this.task = task;
}
@Override
public void run() {
final T t = target.get();
if (t != null) {
task.run(t);
} else if (future != null) {
future.cancel(false);
}
}
public void setFuture(final ScheduledFuture<?> future) {
this.future = future;
}
}
// Unscheduled future
@Slf4j
public static class UnscheduledFuture implements ScheduledFuture<Object> {
private final String name;
public UnscheduledFuture(final String name) {
this.name = name;
}
@Override
public long getDelay(final TimeUnit unit) {
return 0;
}
@Override
public int compareTo(final Delayed o) {
return 0;
}
@Override
public boolean cancel(final boolean mayInterruptIfRunning) {
log.debug("Cancelling future for: {}", name);
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return false;
}
@Override
public Object get() {
return null;
}
@Override
public Object get(final long timeout, final TimeUnit unit) {
return null;
}
}
}