From 486d98135eabdae3fe511a944bfab85a1d0c6363 Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Mon, 10 Feb 2020 11:37:36 -0800 Subject: [PATCH] Code review changes --- .../datadog/trace/agent/tooling/Cleaner.java | 9 +- .../trace/agent/tooling/CleanerTest.groovy | 8 +- .../datadog/opentracing/PendingTrace.java | 5 +- .../writer/ddagent/BatchWritingDisruptor.java | 8 +- .../common/writer/ddagent/DDAgentApi.java | 4 +- .../ddagent/TraceProcessingDisruptor.java | 4 +- .../common/exec/CommonTaskExecutor.java | 85 +++++++++++++++++++ .../datadog/common/exec/SharedExecutors.java | 58 ------------- utils/thread-utils/thread-utils.gradle | 4 + 9 files changed, 105 insertions(+), 80 deletions(-) create mode 100644 utils/thread-utils/src/main/java/datadog/common/exec/CommonTaskExecutor.java delete mode 100644 utils/thread-utils/src/main/java/datadog/common/exec/SharedExecutors.java diff --git a/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/Cleaner.java b/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/Cleaner.java index 1813b87cd5..d4c0807823 100644 --- a/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/Cleaner.java +++ b/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/Cleaner.java @@ -1,8 +1,6 @@ package datadog.trace.agent.tooling; -import static datadog.common.exec.SharedExecutors.isTaskSchedulerShutdown; -import static datadog.common.exec.SharedExecutors.scheduleTaskAtFixedRate; - +import datadog.common.exec.CommonTaskExecutor; import java.lang.ref.WeakReference; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledFuture; @@ -15,13 +13,14 @@ class Cleaner { void scheduleCleaning( final T target, final Adapter adapter, final long frequency, final TimeUnit unit) { final CleanupRunnable command = new CleanupRunnable<>(target, adapter); - if (isTaskSchedulerShutdown()) { + if (CommonTaskExecutor.INSTANCE.isShutdown()) { log.warn( "Cleaning scheduled but task scheduler is shutdown. Target won't be cleaned {}", target); } else { try { // Schedule job and save future to allow job to be canceled if target is GC'd. - command.setFuture(scheduleTaskAtFixedRate(command, frequency, frequency, unit)); + command.setFuture( + CommonTaskExecutor.INSTANCE.scheduleAtFixedRate(command, frequency, frequency, unit)); } catch (final RejectedExecutionException e) { log.warn("Cleaning task rejected. Target won't be cleaned {}", target); } diff --git a/dd-java-agent/agent-tooling/src/test/groovy/datadog/trace/agent/tooling/CleanerTest.groovy b/dd-java-agent/agent-tooling/src/test/groovy/datadog/trace/agent/tooling/CleanerTest.groovy index c74481c784..53f76137ed 100644 --- a/dd-java-agent/agent-tooling/src/test/groovy/datadog/trace/agent/tooling/CleanerTest.groovy +++ b/dd-java-agent/agent-tooling/src/test/groovy/datadog/trace/agent/tooling/CleanerTest.groovy @@ -1,5 +1,6 @@ package datadog.trace.agent.tooling +import datadog.common.exec.CommonTaskExecutor import datadog.trace.util.gc.GCUtils import datadog.trace.util.test.DDSpecification import spock.lang.Subject @@ -8,7 +9,6 @@ import java.lang.ref.WeakReference import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicInteger -import static datadog.common.exec.SharedExecutors.isTaskSchedulerShutdown import static java.util.concurrent.TimeUnit.MILLISECONDS class CleanerTest extends DDSpecification { @@ -28,7 +28,7 @@ class CleanerTest extends DDSpecification { } expect: - !isTaskSchedulerShutdown() + !CommonTaskExecutor.INSTANCE.isShutdown() when: cleaner.scheduleCleaning(target, action, 10, MILLISECONDS) @@ -49,7 +49,7 @@ class CleanerTest extends DDSpecification { } expect: - !isTaskSchedulerShutdown() + !CommonTaskExecutor.INSTANCE.isShutdown() when: cleaner.scheduleCleaning(target.get(), action, 10, MILLISECONDS) @@ -73,7 +73,7 @@ class CleanerTest extends DDSpecification { } expect: - !isTaskSchedulerShutdown() + !CommonTaskExecutor.INSTANCE.isShutdown() when: cleaner.scheduleCleaning(null, action, 10, MILLISECONDS) diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/PendingTrace.java b/dd-trace-ot/src/main/java/datadog/opentracing/PendingTrace.java index 3ceaff45d1..60a4fc26fa 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/PendingTrace.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/PendingTrace.java @@ -1,7 +1,6 @@ package datadog.opentracing; -import static datadog.common.exec.SharedExecutors.scheduleTaskAtFixedRate; - +import datadog.common.exec.CommonTaskExecutor; import datadog.opentracing.scopemanager.ContinuableScope; import datadog.trace.common.util.Clock; import java.io.Closeable; @@ -307,7 +306,7 @@ public class PendingTrace extends ConcurrentLinkedDeque { Collections.newSetFromMap(new ConcurrentHashMap()); public SpanCleaner() { - scheduleTaskAtFixedRate(this, 0, CLEAN_FREQUENCY, TimeUnit.SECONDS); + CommonTaskExecutor.INSTANCE.scheduleAtFixedRate(this, 0, CLEAN_FREQUENCY, TimeUnit.SECONDS); } @Override diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/BatchWritingDisruptor.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/BatchWritingDisruptor.java index 299718e5a9..abd3e30176 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/BatchWritingDisruptor.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/BatchWritingDisruptor.java @@ -1,9 +1,7 @@ package datadog.trace.common.writer.ddagent; -import static datadog.common.exec.DaemonThreadFactory.TRACE_WRITER; -import static datadog.common.exec.SharedExecutors.scheduleTaskAtFixedRate; - import com.lmax.disruptor.EventHandler; +import datadog.common.exec.CommonTaskExecutor; import datadog.common.exec.DaemonThreadFactory; import datadog.trace.common.writer.DDAgentWriter; import java.util.ArrayList; @@ -44,13 +42,13 @@ public class BatchWritingDisruptor extends AbstractDisruptor { } } }; - scheduleTaskAtFixedRate(heartbeat, 100, 100, TimeUnit.MILLISECONDS); + CommonTaskExecutor.INSTANCE.scheduleAtFixedRate(heartbeat, 100, 100, TimeUnit.MILLISECONDS); } } @Override protected DaemonThreadFactory getThreadFactory() { - return TRACE_WRITER; + return DaemonThreadFactory.TRACE_WRITER; } @Override diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java index 16e6b60e60..a0a0f7ad02 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java @@ -1,11 +1,11 @@ package datadog.trace.common.writer.ddagent; -import static datadog.common.exec.SharedExecutors.taskScheduler; import static datadog.trace.common.serialization.MsgpackFormatWriter.MSGPACK_WRITER; import com.squareup.moshi.JsonAdapter; import com.squareup.moshi.Moshi; import com.squareup.moshi.Types; +import datadog.common.exec.CommonTaskExecutor; import datadog.opentracing.ContainerInfo; import datadog.opentracing.DDSpan; import datadog.opentracing.DDTraceOTInfo; @@ -267,7 +267,7 @@ public class DDAgentApi { .readTimeout(HTTP_TIMEOUT, TimeUnit.SECONDS) // We don't do async so this shouldn't matter, but just to be safe... - .dispatcher(new Dispatcher(taskScheduler())) + .dispatcher(new Dispatcher(CommonTaskExecutor.INSTANCE)) .build(); } diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceProcessingDisruptor.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceProcessingDisruptor.java index 3966e3dcec..86d0e7177b 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceProcessingDisruptor.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceProcessingDisruptor.java @@ -1,7 +1,5 @@ package datadog.trace.common.writer.ddagent; -import static datadog.common.exec.DaemonThreadFactory.TRACE_PROCESSOR; - import com.lmax.disruptor.EventHandler; import datadog.common.exec.DaemonThreadFactory; import datadog.opentracing.DDSpan; @@ -31,7 +29,7 @@ public class TraceProcessingDisruptor extends AbstractDisruptor> { @Override protected DaemonThreadFactory getThreadFactory() { - return TRACE_PROCESSOR; + return DaemonThreadFactory.TRACE_PROCESSOR; } @Override diff --git a/utils/thread-utils/src/main/java/datadog/common/exec/CommonTaskExecutor.java b/utils/thread-utils/src/main/java/datadog/common/exec/CommonTaskExecutor.java new file mode 100644 index 0000000000..a2f1d57960 --- /dev/null +++ b/utils/thread-utils/src/main/java/datadog/common/exec/CommonTaskExecutor.java @@ -0,0 +1,85 @@ +package datadog.common.exec; + +import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public final class CommonTaskExecutor extends AbstractExecutorService { + public static final CommonTaskExecutor INSTANCE = new CommonTaskExecutor(); + private static final long SHUTDOWN_WAIT_SECONDS = 5; + + private final ScheduledExecutorService executorService = + Executors.newSingleThreadScheduledExecutor(DaemonThreadFactory.TASK_SCHEDULER); + + private CommonTaskExecutor() { + try { + Runtime.getRuntime().addShutdownHook(new ShutdownCallback(executorService)); + } catch (final IllegalStateException ex) { + // The JVM is already shutting down. + log.debug("Error adding shutdown hook", ex); + } + } + + public ScheduledFuture scheduleAtFixedRate( + final Runnable command, final long initialDelay, final long period, final TimeUnit unit) { + return executorService.scheduleAtFixedRate(command, initialDelay, period, unit); + } + + @Override + public void shutdown() { + executorService.shutdown(); + } + + @Override + public List shutdownNow() { + return executorService.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return executorService.isShutdown(); + } + + @Override + public boolean isTerminated() { + return executorService.isTerminated(); + } + + @Override + public boolean awaitTermination(final long timeout, final TimeUnit unit) + throws InterruptedException { + return executorService.awaitTermination(timeout, unit); + } + + @Override + public void execute(final Runnable command) { + executorService.execute(command); + } + + private static final class ShutdownCallback extends Thread { + + private final ScheduledExecutorService executorService; + + private ShutdownCallback(final ScheduledExecutorService executorService) { + super("dd-exec-shutdown-hook"); + this.executorService = executorService; + } + + @Override + public void run() { + executorService.shutdown(); + try { + if (!executorService.awaitTermination(SHUTDOWN_WAIT_SECONDS, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (final InterruptedException e) { + executorService.shutdownNow(); + } + } + } +} diff --git a/utils/thread-utils/src/main/java/datadog/common/exec/SharedExecutors.java b/utils/thread-utils/src/main/java/datadog/common/exec/SharedExecutors.java deleted file mode 100644 index 2f17716efd..0000000000 --- a/utils/thread-utils/src/main/java/datadog/common/exec/SharedExecutors.java +++ /dev/null @@ -1,58 +0,0 @@ -package datadog.common.exec; - -import static datadog.common.exec.DaemonThreadFactory.TASK_SCHEDULER; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -public final class SharedExecutors { - private static final long SHUTDOWN_WAIT_SECONDS = 5; - - private static final ScheduledExecutorService TASK_SCHEDULER_EXECUTOR_SERVICE = - Executors.newSingleThreadScheduledExecutor(TASK_SCHEDULER); - - static { - try { - Runtime.getRuntime().addShutdownHook(new ShutdownCallback(TASK_SCHEDULER_EXECUTOR_SERVICE)); - } catch (final IllegalStateException ex) { - // The JVM is already shutting down. - } - } - - public static ScheduledExecutorService taskScheduler() { - return TASK_SCHEDULER_EXECUTOR_SERVICE; - } - - public static ScheduledFuture scheduleTaskAtFixedRate( - final Runnable command, final long initialDelay, final long period, final TimeUnit unit) { - return TASK_SCHEDULER_EXECUTOR_SERVICE.scheduleAtFixedRate(command, initialDelay, period, unit); - } - - public static boolean isTaskSchedulerShutdown() { - return TASK_SCHEDULER_EXECUTOR_SERVICE.isShutdown(); - } - - private static final class ShutdownCallback extends Thread { - - private final ScheduledExecutorService executorService; - - private ShutdownCallback(final ScheduledExecutorService executorService) { - super("dd-exec-shutdown-hook"); - this.executorService = executorService; - } - - @Override - public void run() { - executorService.shutdown(); - try { - if (!executorService.awaitTermination(SHUTDOWN_WAIT_SECONDS, TimeUnit.SECONDS)) { - executorService.shutdownNow(); - } - } catch (final InterruptedException e) { - executorService.shutdownNow(); - } - } - } -} diff --git a/utils/thread-utils/thread-utils.gradle b/utils/thread-utils/thread-utils.gradle index ff6901ae6f..29bb468d6e 100644 --- a/utils/thread-utils/thread-utils.gradle +++ b/utils/thread-utils/thread-utils.gradle @@ -1 +1,5 @@ apply from: "${rootDir}/gradle/java.gradle" + +dependencies { + compile deps.slf4j +}