Code review changes

This commit is contained in:
Tyler Benson 2020-02-10 11:37:36 -08:00
parent 9c6cfbe359
commit 486d98135e
9 changed files with 105 additions and 80 deletions

View File

@ -1,8 +1,6 @@
package datadog.trace.agent.tooling; package datadog.trace.agent.tooling;
import static datadog.common.exec.SharedExecutors.isTaskSchedulerShutdown; import datadog.common.exec.CommonTaskExecutor;
import static datadog.common.exec.SharedExecutors.scheduleTaskAtFixedRate;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
@ -15,13 +13,14 @@ class Cleaner {
<T> void scheduleCleaning( <T> void scheduleCleaning(
final T target, final Adapter<T> adapter, final long frequency, final TimeUnit unit) { final T target, final Adapter<T> adapter, final long frequency, final TimeUnit unit) {
final CleanupRunnable<T> command = new CleanupRunnable<>(target, adapter); final CleanupRunnable<T> command = new CleanupRunnable<>(target, adapter);
if (isTaskSchedulerShutdown()) { if (CommonTaskExecutor.INSTANCE.isShutdown()) {
log.warn( log.warn(
"Cleaning scheduled but task scheduler is shutdown. Target won't be cleaned {}", target); "Cleaning scheduled but task scheduler is shutdown. Target won't be cleaned {}", target);
} else { } else {
try { try {
// Schedule job and save future to allow job to be canceled if target is GC'd. // Schedule job and save future to allow job to be canceled if target is GC'd.
command.setFuture(scheduleTaskAtFixedRate(command, frequency, frequency, unit)); command.setFuture(
CommonTaskExecutor.INSTANCE.scheduleAtFixedRate(command, frequency, frequency, unit));
} catch (final RejectedExecutionException e) { } catch (final RejectedExecutionException e) {
log.warn("Cleaning task rejected. Target won't be cleaned {}", target); log.warn("Cleaning task rejected. Target won't be cleaned {}", target);
} }

View File

@ -1,5 +1,6 @@
package datadog.trace.agent.tooling package datadog.trace.agent.tooling
import datadog.common.exec.CommonTaskExecutor
import datadog.trace.util.gc.GCUtils import datadog.trace.util.gc.GCUtils
import datadog.trace.util.test.DDSpecification import datadog.trace.util.test.DDSpecification
import spock.lang.Subject import spock.lang.Subject
@ -8,7 +9,6 @@ import java.lang.ref.WeakReference
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import static datadog.common.exec.SharedExecutors.isTaskSchedulerShutdown
import static java.util.concurrent.TimeUnit.MILLISECONDS import static java.util.concurrent.TimeUnit.MILLISECONDS
class CleanerTest extends DDSpecification { class CleanerTest extends DDSpecification {
@ -28,7 +28,7 @@ class CleanerTest extends DDSpecification {
} }
expect: expect:
!isTaskSchedulerShutdown() !CommonTaskExecutor.INSTANCE.isShutdown()
when: when:
cleaner.scheduleCleaning(target, action, 10, MILLISECONDS) cleaner.scheduleCleaning(target, action, 10, MILLISECONDS)
@ -49,7 +49,7 @@ class CleanerTest extends DDSpecification {
} }
expect: expect:
!isTaskSchedulerShutdown() !CommonTaskExecutor.INSTANCE.isShutdown()
when: when:
cleaner.scheduleCleaning(target.get(), action, 10, MILLISECONDS) cleaner.scheduleCleaning(target.get(), action, 10, MILLISECONDS)
@ -73,7 +73,7 @@ class CleanerTest extends DDSpecification {
} }
expect: expect:
!isTaskSchedulerShutdown() !CommonTaskExecutor.INSTANCE.isShutdown()
when: when:
cleaner.scheduleCleaning(null, action, 10, MILLISECONDS) cleaner.scheduleCleaning(null, action, 10, MILLISECONDS)

View File

@ -1,7 +1,6 @@
package datadog.opentracing; package datadog.opentracing;
import static datadog.common.exec.SharedExecutors.scheduleTaskAtFixedRate; import datadog.common.exec.CommonTaskExecutor;
import datadog.opentracing.scopemanager.ContinuableScope; import datadog.opentracing.scopemanager.ContinuableScope;
import datadog.trace.common.util.Clock; import datadog.trace.common.util.Clock;
import java.io.Closeable; import java.io.Closeable;
@ -307,7 +306,7 @@ public class PendingTrace extends ConcurrentLinkedDeque<DDSpan> {
Collections.newSetFromMap(new ConcurrentHashMap<PendingTrace, Boolean>()); Collections.newSetFromMap(new ConcurrentHashMap<PendingTrace, Boolean>());
public SpanCleaner() { public SpanCleaner() {
scheduleTaskAtFixedRate(this, 0, CLEAN_FREQUENCY, TimeUnit.SECONDS); CommonTaskExecutor.INSTANCE.scheduleAtFixedRate(this, 0, CLEAN_FREQUENCY, TimeUnit.SECONDS);
} }
@Override @Override

View File

@ -1,9 +1,7 @@
package datadog.trace.common.writer.ddagent; package datadog.trace.common.writer.ddagent;
import static datadog.common.exec.DaemonThreadFactory.TRACE_WRITER;
import static datadog.common.exec.SharedExecutors.scheduleTaskAtFixedRate;
import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.EventHandler;
import datadog.common.exec.CommonTaskExecutor;
import datadog.common.exec.DaemonThreadFactory; import datadog.common.exec.DaemonThreadFactory;
import datadog.trace.common.writer.DDAgentWriter; import datadog.trace.common.writer.DDAgentWriter;
import java.util.ArrayList; import java.util.ArrayList;
@ -44,13 +42,13 @@ public class BatchWritingDisruptor extends AbstractDisruptor<byte[]> {
} }
} }
}; };
scheduleTaskAtFixedRate(heartbeat, 100, 100, TimeUnit.MILLISECONDS); CommonTaskExecutor.INSTANCE.scheduleAtFixedRate(heartbeat, 100, 100, TimeUnit.MILLISECONDS);
} }
} }
@Override @Override
protected DaemonThreadFactory getThreadFactory() { protected DaemonThreadFactory getThreadFactory() {
return TRACE_WRITER; return DaemonThreadFactory.TRACE_WRITER;
} }
@Override @Override

View File

@ -1,11 +1,11 @@
package datadog.trace.common.writer.ddagent; package datadog.trace.common.writer.ddagent;
import static datadog.common.exec.SharedExecutors.taskScheduler;
import static datadog.trace.common.serialization.MsgpackFormatWriter.MSGPACK_WRITER; import static datadog.trace.common.serialization.MsgpackFormatWriter.MSGPACK_WRITER;
import com.squareup.moshi.JsonAdapter; import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.Moshi; import com.squareup.moshi.Moshi;
import com.squareup.moshi.Types; import com.squareup.moshi.Types;
import datadog.common.exec.CommonTaskExecutor;
import datadog.opentracing.ContainerInfo; import datadog.opentracing.ContainerInfo;
import datadog.opentracing.DDSpan; import datadog.opentracing.DDSpan;
import datadog.opentracing.DDTraceOTInfo; import datadog.opentracing.DDTraceOTInfo;
@ -267,7 +267,7 @@ public class DDAgentApi {
.readTimeout(HTTP_TIMEOUT, TimeUnit.SECONDS) .readTimeout(HTTP_TIMEOUT, TimeUnit.SECONDS)
// We don't do async so this shouldn't matter, but just to be safe... // We don't do async so this shouldn't matter, but just to be safe...
.dispatcher(new Dispatcher(taskScheduler())) .dispatcher(new Dispatcher(CommonTaskExecutor.INSTANCE))
.build(); .build();
} }

View File

@ -1,7 +1,5 @@
package datadog.trace.common.writer.ddagent; package datadog.trace.common.writer.ddagent;
import static datadog.common.exec.DaemonThreadFactory.TRACE_PROCESSOR;
import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.EventHandler;
import datadog.common.exec.DaemonThreadFactory; import datadog.common.exec.DaemonThreadFactory;
import datadog.opentracing.DDSpan; import datadog.opentracing.DDSpan;
@ -31,7 +29,7 @@ public class TraceProcessingDisruptor extends AbstractDisruptor<List<DDSpan>> {
@Override @Override
protected DaemonThreadFactory getThreadFactory() { protected DaemonThreadFactory getThreadFactory() {
return TRACE_PROCESSOR; return DaemonThreadFactory.TRACE_PROCESSOR;
} }
@Override @Override

View File

@ -0,0 +1,85 @@
package datadog.common.exec;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public final class CommonTaskExecutor extends AbstractExecutorService {
public static final CommonTaskExecutor INSTANCE = new CommonTaskExecutor();
private static final long SHUTDOWN_WAIT_SECONDS = 5;
private final ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor(DaemonThreadFactory.TASK_SCHEDULER);
private CommonTaskExecutor() {
try {
Runtime.getRuntime().addShutdownHook(new ShutdownCallback(executorService));
} catch (final IllegalStateException ex) {
// The JVM is already shutting down.
log.debug("Error adding shutdown hook", ex);
}
}
public ScheduledFuture<?> scheduleAtFixedRate(
final Runnable command, final long initialDelay, final long period, final TimeUnit unit) {
return executorService.scheduleAtFixedRate(command, initialDelay, period, unit);
}
@Override
public void shutdown() {
executorService.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
return executorService.shutdownNow();
}
@Override
public boolean isShutdown() {
return executorService.isShutdown();
}
@Override
public boolean isTerminated() {
return executorService.isTerminated();
}
@Override
public boolean awaitTermination(final long timeout, final TimeUnit unit)
throws InterruptedException {
return executorService.awaitTermination(timeout, unit);
}
@Override
public void execute(final Runnable command) {
executorService.execute(command);
}
private static final class ShutdownCallback extends Thread {
private final ScheduledExecutorService executorService;
private ShutdownCallback(final ScheduledExecutorService executorService) {
super("dd-exec-shutdown-hook");
this.executorService = executorService;
}
@Override
public void run() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(SHUTDOWN_WAIT_SECONDS, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (final InterruptedException e) {
executorService.shutdownNow();
}
}
}
}

View File

@ -1,58 +0,0 @@
package datadog.common.exec;
import static datadog.common.exec.DaemonThreadFactory.TASK_SCHEDULER;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public final class SharedExecutors {
private static final long SHUTDOWN_WAIT_SECONDS = 5;
private static final ScheduledExecutorService TASK_SCHEDULER_EXECUTOR_SERVICE =
Executors.newSingleThreadScheduledExecutor(TASK_SCHEDULER);
static {
try {
Runtime.getRuntime().addShutdownHook(new ShutdownCallback(TASK_SCHEDULER_EXECUTOR_SERVICE));
} catch (final IllegalStateException ex) {
// The JVM is already shutting down.
}
}
public static ScheduledExecutorService taskScheduler() {
return TASK_SCHEDULER_EXECUTOR_SERVICE;
}
public static ScheduledFuture<?> scheduleTaskAtFixedRate(
final Runnable command, final long initialDelay, final long period, final TimeUnit unit) {
return TASK_SCHEDULER_EXECUTOR_SERVICE.scheduleAtFixedRate(command, initialDelay, period, unit);
}
public static boolean isTaskSchedulerShutdown() {
return TASK_SCHEDULER_EXECUTOR_SERVICE.isShutdown();
}
private static final class ShutdownCallback extends Thread {
private final ScheduledExecutorService executorService;
private ShutdownCallback(final ScheduledExecutorService executorService) {
super("dd-exec-shutdown-hook");
this.executorService = executorService;
}
@Override
public void run() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(SHUTDOWN_WAIT_SECONDS, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (final InterruptedException e) {
executorService.shutdownNow();
}
}
}
}

View File

@ -1 +1,5 @@
apply from: "${rootDir}/gradle/java.gradle" apply from: "${rootDir}/gradle/java.gradle"
dependencies {
compile deps.slf4j
}