Make Muzzle handle cleaner thread properly.

This commit is contained in:
Tyler Benson 2019-08-19 14:59:55 -07:00
parent 09213f63b4
commit 8c1dbfe7cb
12 changed files with 106 additions and 106 deletions

View File

@ -33,6 +33,7 @@ class MuzzlePlugin implements Plugin<Project> {
*/
private static final List<RemoteRepository> MUZZLE_REPOS
private static final AtomicReference<ClassLoader> TOOLING_LOADER = new AtomicReference<>()
private static final AtomicReference<Set<Thread>> ALLOWED_THREADS = new AtomicReference<>()
static {
RemoteRepository central = new RemoteRepository.Builder("central", "default", "http://central.maven.org/maven2/").build()
MUZZLE_REPOS = new ArrayList<RemoteRepository>(Arrays.asList(central))
@ -94,18 +95,6 @@ class MuzzlePlugin implements Plugin<Project> {
return
}
if (!project.rootProject.tasks.getNames().contains('muzzleCleanup')) {
def muzzleCleanup = project.rootProject.task('muzzleCleanup') {
group = 'Muzzle'
doLast {
project.rootProject.getLogger().info("Cleaning up global tooling loader")
TOOLING_LOADER.set(null)
}
}
muzzleCleanup.outputs.upToDateWhen { false }
}
project.tasks.muzzle.finalizedBy(project.rootProject.tasks.muzzleCleanup)
final RepositorySystem system = newRepositorySystem()
final RepositorySystemSession session = newRepositorySystemSession(system)
@ -131,18 +120,24 @@ class MuzzlePlugin implements Plugin<Project> {
}
private static ClassLoader getOrCreateToolingLoader(Project toolingProject) {
final ClassLoader toolingLoader = TOOLING_LOADER.get()
if (toolingLoader == null) {
Set<URL> ddUrls = new HashSet<>()
toolingProject.getLogger().info('creating classpath for agent-tooling')
for (File f : toolingProject.sourceSets.main.runtimeClasspath.getFiles()) {
toolingProject.getLogger().info('--' + f)
ddUrls.add(f.toURI().toURL())
synchronized (TOOLING_LOADER) {
final ClassLoader toolingLoader = TOOLING_LOADER.get()
if (toolingLoader == null) {
Set<URL> ddUrls = new HashSet<>()
toolingProject.getLogger().info('creating classpath for agent-tooling')
for (File f : toolingProject.sourceSets.main.runtimeClasspath.getFiles()) {
toolingProject.getLogger().info('--' + f)
ddUrls.add(f.toURI().toURL())
}
def loader = new URLClassLoader(ddUrls.toArray(new URL[0]), (ClassLoader) null)
assert TOOLING_LOADER.compareAndSet(null, loader)
loader.loadClass("datadog.trace.agent.tooling.AgentTooling").getMethod("init").invoke(null)
assert ALLOWED_THREADS.compareAndSet(null, new HashSet<Thread>(Arrays.asList(Thread.getThreads())))
assert ALLOWED_THREADS.get().size() > 0
return TOOLING_LOADER.get()
} else {
return toolingLoader
}
TOOLING_LOADER.compareAndSet(null, new URLClassLoader(ddUrls.toArray(new URL[0]), (ClassLoader) null))
return TOOLING_LOADER.get()
} else {
return toolingLoader
}
}
@ -263,7 +258,7 @@ class MuzzlePlugin implements Plugin<Project> {
private static Task addMuzzleTask(MuzzleDirective muzzleDirective, Artifact versionArtifact, Project instrumentationProject, Task runAfter, Project bootstrapProject, Project toolingProject) {
def taskName = "muzzle-Assert${muzzleDirective.assertPass ? "Pass" : "Fail"}-$versionArtifact.groupId-$versionArtifact.artifactId-$versionArtifact.version${muzzleDirective.name ? "-${muzzleDirective.getNameSlug()}" : ""}"
def config = instrumentationProject.configurations.create(taskName)
def dep = instrumentationProject.dependencies.create("$versionArtifact.groupId:$versionArtifact.artifactId:$versionArtifact.version") {
def dep = instrumentationProject.dependencies.create("$versionArtifact.groupId:$versionArtifact.artifactId:$versionArtifact.version") {
transitive = true
}
// The following optional transitive dependencies are brought in by some legacy module such as log4j 1.x but are no
@ -288,7 +283,7 @@ class MuzzlePlugin implements Plugin<Project> {
assertionMethod.invoke(null, instrumentationCL, userCL, muzzleDirective.assertPass)
for (Thread thread : Thread.getThreads()) {
if (thread.getName().startsWith("dd-")) {
if (!ALLOWED_THREADS.get().contains(thread)) {
throw new GradleException("Task $taskName has spawned a thread: $thread. This will prevent GC of dynamic muzzle classes. Aborting muzzle run.")
}
}

View File

@ -1 +0,0 @@
implementation-class=VersionScanPlugin

View File

@ -107,7 +107,7 @@ public interface WeakMap<K, V> {
}
@Override
public V getOrCreate(K key, ValueSupplier<V> supplier) {
public V getOrCreate(final K key, final ValueSupplier<V> supplier) {
if (!map.containsKey(key)) {
synchronized (this) {
if (!map.containsKey(key)) {

View File

@ -10,7 +10,6 @@ import static net.bytebuddy.matcher.ElementMatchers.none;
import static net.bytebuddy.matcher.ElementMatchers.not;
import datadog.trace.api.Config;
import datadog.trace.bootstrap.WeakMap;
import java.lang.instrument.Instrumentation;
import java.util.Collections;
import java.util.List;
@ -29,15 +28,6 @@ import net.bytebuddy.utility.JavaModule;
@Slf4j
public class AgentInstaller {
private static final Map<String, Runnable> classLoadCallbacks = new ConcurrentHashMap<>();
public static final Cleaner CLEANER = new Cleaner();
static {
// WeakMap is used by other classes below, so we need to register the provider first.
registerWeakMapProvider(CLEANER);
}
public static final DDLocationStrategy LOCATION_STRATEGY = new DDLocationStrategy();
public static final AgentBuilder.PoolStrategy POOL_STRATEGY = new DDCachingPoolStrategy(CLEANER);
private static volatile Instrumentation INSTRUMENTATION;
public static Instrumentation getInstrumentation() {
@ -68,10 +58,10 @@ public class AgentInstaller {
.with(AgentBuilder.RedefinitionStrategy.RETRANSFORMATION)
.with(new RedefinitionLoggingListener())
.with(AgentBuilder.DescriptionStrategy.Default.POOL_ONLY)
.with(POOL_STRATEGY)
.with(AgentTooling.poolStrategy())
.with(new TransformLoggingListener())
.with(new ClassLoadListener())
.with(LOCATION_STRATEGY)
.with(AgentTooling.locationStrategy())
// FIXME: we cannot enable it yet due to BB/JVM bug, see
// https://github.com/raphw/byte-buddy/issues/558
// .with(AgentBuilder.LambdaInstrumentationStrategy.ENABLED)
@ -153,7 +143,6 @@ public class AgentInstaller {
}
log.debug("Installed {} instrumenter(s)", numInstrumenters);
CLEANER.start();
return agentBuilder.installOn(inst);
}
@ -173,14 +162,6 @@ public class AgentInstaller {
return matcher;
}
private static void registerWeakMapProvider(final Cleaner cleaner) {
if (!WeakMap.Provider.isProviderRegistered()) {
WeakMap.Provider.registerIfAbsent(new WeakMapSuppliers.WeakConcurrent(cleaner));
// WeakMap.Provider.registerIfAbsent(new WeakMapSuppliers.WeakConcurrent.Inline());
// WeakMap.Provider.registerIfAbsent(new WeakMapSuppliers.Guava());
}
}
@Slf4j
static class RedefinitionLoggingListener implements AgentBuilder.RedefinitionStrategy.Listener {

View File

@ -0,0 +1,35 @@
package datadog.trace.agent.tooling;
import datadog.trace.bootstrap.WeakMap;
public class AgentTooling {
private static final Cleaner CLEANER = new Cleaner();
static {
// WeakMap is used by other classes below, so we need to register the provider first.
registerWeakMapProvider(CLEANER);
}
private static final DDLocationStrategy LOCATION_STRATEGY = new DDLocationStrategy();
private static final DDCachingPoolStrategy POOL_STRATEGY = new DDCachingPoolStrategy(CLEANER);
public static void init() {
// Only need to trigger static initializers for now.
}
public static DDLocationStrategy locationStrategy() {
return LOCATION_STRATEGY;
}
public static DDCachingPoolStrategy poolStrategy() {
return POOL_STRATEGY;
}
private static void registerWeakMapProvider(final Cleaner cleaner) {
if (!WeakMap.Provider.isProviderRegistered()) {
WeakMap.Provider.registerIfAbsent(new WeakMapSuppliers.WeakConcurrent(cleaner));
// WeakMap.Provider.registerIfAbsent(new WeakMapSuppliers.WeakConcurrent.Inline());
// WeakMap.Provider.registerIfAbsent(new WeakMapSuppliers.Guava());
}
}
}

View File

@ -1,6 +1,7 @@
package datadog.trace.agent.tooling;
import java.lang.ref.WeakReference;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@ -25,47 +26,37 @@ class Cleaner {
}
};
private volatile ScheduledThreadPoolExecutor cleanerService = null;
private volatile Thread shutdownCallback = null;
private final ScheduledThreadPoolExecutor cleanerService;
private final Thread shutdownCallback;
Cleaner() {
cleanerService = new ScheduledThreadPoolExecutor(1, THREAD_FACTORY);
cleanerService.setRemoveOnCancelPolicy(true);
shutdownCallback = new ShutdownCallback(cleanerService);
try {
Runtime.getRuntime().addShutdownHook(shutdownCallback);
} catch (final IllegalStateException ex) {
// The JVM is already shutting down.
}
}
<T> void scheduleCleaning(
final T target, final Adapter<T> adapter, final long frequency, final TimeUnit unit) {
final CleanupRunnable<T> command = new CleanupRunnable<>(target, adapter);
if (cleanerService == null) {
log.warn("Cleaning scheduled before starting cleaner. Target won't be cleaned {}", target);
if (cleanerService.isShutdown()) {
log.warn("Cleaning scheduled but cleaner is shutdown. Target won't be cleaned {}", target);
} else {
command.setFuture(cleanerService.scheduleAtFixedRate(command, frequency, frequency, unit));
}
}
public void start() {
if (cleanerService == null) {
synchronized (this) {
if (cleanerService == null) {
cleanerService = new ScheduledThreadPoolExecutor(1, THREAD_FACTORY);
cleanerService.setRemoveOnCancelPolicy(true);
shutdownCallback = new ShutdownCallback(cleanerService);
try {
Runtime.getRuntime().addShutdownHook(shutdownCallback);
} catch (final IllegalStateException ex) {
// The JVM is already shutting down.
}
}
try {
command.setFuture(cleanerService.scheduleAtFixedRate(command, frequency, frequency, unit));
} catch (final RejectedExecutionException e) {
log.warn("Cleaning task rejected. Target won't be cleaned {}", target);
}
}
}
public void stop() {
if (cleanerService == null) {
synchronized (this) {
if (cleanerService == null) {
cleanerService.shutdown();
Runtime.getRuntime().removeShutdownHook(shutdownCallback);
cleanerService = null;
shutdownCallback = null;
}
}
}
private void stop() {
cleanerService.shutdownNow();
Runtime.getRuntime().removeShutdownHook(shutdownCallback);
}
@Override
@ -107,7 +98,7 @@ class Cleaner {
private final ScheduledExecutorService executorService;
public ShutdownCallback(final ScheduledExecutorService executorService) {
private ShutdownCallback(final ScheduledExecutorService executorService) {
this.executorService = executorService;
}

View File

@ -28,7 +28,7 @@ import net.bytebuddy.pool.TypePool;
* <p>See eviction policy below.
*/
public class DDCachingPoolStrategy implements PoolStrategy {
private static final WeakMap<ClassLoader, TypePool.CacheProvider> typePoolCache =
private final WeakMap<ClassLoader, TypePool.CacheProvider> typePoolCache =
WeakMap.Provider.newWeakMap();
private final Cleaner cleaner;

View File

@ -1,7 +1,10 @@
package datadog.trace.agent.tooling.muzzle;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.WeakMap;
import java.io.IOException;
import java.util.Collections;
import java.util.WeakHashMap;
import net.bytebuddy.build.Plugin;
import net.bytebuddy.description.type.TypeDefinition;
import net.bytebuddy.description.type.TypeDescription;
@ -10,6 +13,17 @@ import net.bytebuddy.dynamic.DynamicType;
/** Bytebuddy gradle plugin which creates muzzle-references at compile time. */
public class MuzzleGradlePlugin implements Plugin {
static {
// prevent WeakMap from logging warning while plugin is running
WeakMap.Provider.registerIfAbsent(
new WeakMap.Implementation() {
@Override
public <K, V> WeakMap<K, V> get() {
return new WeakMap.MapAdapter<>(Collections.synchronizedMap(new WeakHashMap<K, V>()));
}
});
}
private static final TypeDescription DefaultInstrumenterTypeDesc =
new TypeDescription.ForLoadedType(Instrumenter.Default.class);

View File

@ -1,16 +1,14 @@
package datadog.trace.agent.tooling.muzzle;
import datadog.trace.agent.tooling.AgentTooling;
import datadog.trace.agent.tooling.HelperInjector;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.WeakMap;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.WeakHashMap;
import net.bytebuddy.dynamic.ClassFileLocator;
/**
@ -23,14 +21,7 @@ import net.bytebuddy.dynamic.ClassFileLocator;
*/
public class MuzzleVersionScanPlugin {
static {
// prevent WeakMap from logging warning while plugin is running
WeakMap.Provider.registerIfAbsent(
new WeakMap.Implementation() {
@Override
public <K, V> WeakMap<K, V> get() {
return new WeakMap.MapAdapter<>(Collections.synchronizedMap(new WeakHashMap<K, V>()));
}
});
AgentTooling.init();
}
public static void assertInstrumentationMuzzled(
@ -102,7 +93,7 @@ public class MuzzleVersionScanPlugin {
// only default Instrumenters use muzzle. Skip custom instrumenters.
continue;
}
Instrumenter.Default defaultInstrumenter = (Instrumenter.Default) instrumenter;
final Instrumenter.Default defaultInstrumenter = (Instrumenter.Default) instrumenter;
try {
// verify helper injector works
final String[] helperClassNames = defaultInstrumenter.helperClassNames();
@ -119,7 +110,7 @@ public class MuzzleVersionScanPlugin {
}
}
private static Map<String, byte[]> createHelperMap(Instrumenter.Default instrumenter)
private static Map<String, byte[]> createHelperMap(final Instrumenter.Default instrumenter)
throws IOException {
final Map<String, byte[]> helperMap =
new LinkedHashMap<>(instrumenter.helperClassNames().length);

View File

@ -3,7 +3,7 @@ package datadog.trace.agent.tooling.muzzle;
import static datadog.trace.bootstrap.WeakMap.Provider.newWeakMap;
import static net.bytebuddy.dynamic.loading.ClassLoadingStrategy.BOOTSTRAP_LOADER;
import datadog.trace.agent.tooling.AgentInstaller;
import datadog.trace.agent.tooling.AgentTooling;
import datadog.trace.agent.tooling.Utils;
import datadog.trace.agent.tooling.muzzle.Reference.Mismatch;
import datadog.trace.agent.tooling.muzzle.Reference.Source;
@ -84,8 +84,8 @@ public class ReferenceMatcher {
*/
public static List<Reference.Mismatch> checkMatch(Reference reference, ClassLoader loader) {
final TypePool typePool =
AgentInstaller.POOL_STRATEGY.typePool(
AgentInstaller.LOCATION_STRATEGY.classFileLocator(loader), loader);
AgentTooling.poolStrategy()
.typePool(AgentTooling.locationStrategy().classFileLocator(loader), loader);
final List<Mismatch> mismatches = new ArrayList<>(0);
try {
final TypePool.Resolution resolution =

View File

@ -9,13 +9,10 @@ import java.lang.ref.WeakReference
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import static datadog.trace.agent.tooling.AgentInstaller.CLEANER
import static datadog.trace.agent.tooling.AgentTooling.CLEANER
//@Timeout(5)
class EvictingCacheProviderTest extends Specification {
static {
CLEANER.start()
}
def "test provider"() {
setup:

View File

@ -9,14 +9,11 @@ import spock.lang.Specification
import java.lang.ref.WeakReference
import java.util.concurrent.TimeUnit
import static datadog.trace.agent.tooling.AgentInstaller.CLEANER
import static datadog.trace.agent.tooling.AgentTooling.CLEANER
@Retry
// These tests fail sometimes in CI.
class WeakConcurrentSupplierTest extends Specification {
static {
CLEANER.start()
}
@Shared
def weakConcurrentSupplier = new WeakMapSuppliers.WeakConcurrent(CLEANER)
@Shared