Merge pull request #547 from DataDog/mar-kolya/weak-concurrent-map-thread-cleanup

Mar kolya/weak concurrent map thread cleanup
This commit is contained in:
Nikolay Martynov 2018-10-23 18:12:24 -04:00 committed by GitHub
commit 14e85941c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 123 additions and 43 deletions

View File

@ -25,7 +25,7 @@ jobs:
- run:
name: Build Project
command: GRADLE_OPTS="-Dorg.gradle.jvmargs=-Xmx1G -Xms64M" ./gradlew clean :dd-java-agent:shadowJar compileTestGroovy compileTestScala compileTestJava --build-cache --parallel --stacktrace --no-daemon --max-workers=4
command: GRADLE_OPTS="-Dorg.gradle.jvmargs=-Xmx1G -Xms64M" ./gradlew clean :dd-java-agent:shadowJar compileTestGroovy compileTestScala compileTestJava --build-cache --parallel --stacktrace --no-daemon --max-workers=8
- run:
name: Collect Libs
@ -66,7 +66,7 @@ jobs:
- run:
name: Run Tests
command: GRADLE_OPTS="-Dorg.gradle.jvmargs=-Xmx2G -Xms512M" ./gradlew $TEST_TASK --build-cache --parallel --stacktrace --no-daemon --max-workers=3
command: GRADLE_OPTS="-Dorg.gradle.jvmargs=-Xmx2G -Xms512M" ./gradlew $TEST_TASK --build-cache --parallel --stacktrace --no-daemon --max-workers=6
- run:
name: Collect Reports
@ -130,7 +130,7 @@ jobs:
- run:
name: Run Trace Agent Tests
command: ./gradlew traceAgentTest --build-cache --parallel --stacktrace --no-daemon --max-workers=6
command: ./gradlew traceAgentTest --build-cache --parallel --stacktrace --no-daemon --max-workers=8
- run:
name: Collect Reports
@ -162,7 +162,7 @@ jobs:
- run:
name: Build Project
command: GRADLE_OPTS="-Dorg.gradle.jvmargs=-Xmx1G -Xms64M" ./gradlew check -PskipTests --build-cache --parallel --stacktrace --no-daemon --max-workers=4
command: GRADLE_OPTS="-Dorg.gradle.jvmargs=-Xmx1G -Xms64M" ./gradlew check -PskipTests --build-cache --parallel --stacktrace --no-daemon --max-workers=8
- run:
name: Collect Reports
@ -183,13 +183,34 @@ jobs:
- dd-trace-java-version-scan-{{ checksum "dd-trace-java.gradle" }}
- run:
name: Verify Version Scan and Muzzle
command: GRADLE_OPTS="-Dorg.gradle.jvmargs=-Xmx4G -Xms64M" ./gradlew verifyVersionScan muzzle --parallel --stacktrace --no-daemon --max-workers=2
name: Verify Version Scan
command: GRADLE_OPTS="-Dorg.gradle.jvmargs=-Xmx4G -Xms64M" ./gradlew verifyVersionScan --parallel --stacktrace --no-daemon --max-workers=8
- save_cache:
key: dd-trace-java-version-scan-{{ checksum "dd-trace-java.gradle" }}
paths: ~/.gradle
muzzle:
<<: *defaults
steps:
- checkout
- restore_cache:
# Reset the cache approx every release
keys:
- dd-trace-java-muzzle-{{ checksum "dd-trace-java.gradle" }}
- run:
name: Verify Muzzle
# Note: we do not have `--max-workers` here to have number of workers (threads) equal to number of CPUs (32 currently).
# This should speed things up slightly because muzzle may do a lot of IO bound work: reading off disk and downloading
# dependencies.
command: GRADLE_OPTS="-Dorg.gradle.jvmargs=-Xmx12G -Xms64M" ./gradlew muzzle --parallel --stacktrace --no-daemon
- save_cache:
key: dd-trace-java-muzzle-{{ checksum "dd-trace-java.gradle" }}
paths: ~/.gradle
publish: &publish
<<: *defaults
steps:
@ -276,6 +297,13 @@ workflows:
branches:
ignore: master
- muzzle:
requires:
- build
filters:
branches:
ignore: master
- publish_master:
requires:
- test_7

View File

@ -12,6 +12,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
class WeakMapSuppliers {
// Comparison with using WeakConcurrentMap vs Guava's implementation:
@ -57,48 +58,28 @@ class WeakMapSuppliers {
private final Queue<WeakReference<WeakConcurrentMap>> suppliedMaps =
new ConcurrentLinkedQueue<>();
private final Runnable runnable =
new Runnable() {
@Override
public void run() {
for (final Iterator<WeakReference<WeakConcurrentMap>> iterator =
suppliedMaps.iterator();
iterator.hasNext(); ) {
final WeakConcurrentMap map = iterator.next().get();
if (map == null) {
iterator.remove();
} else {
map.expungeStaleEntries();
}
}
}
};
private final AtomicBoolean finalized = new AtomicBoolean(false);
WeakConcurrent() {
cleanerExecutorService = Executors.newScheduledThreadPool(1, THREAD_FACTORY);
cleanerExecutorService.scheduleAtFixedRate(
runnable, CLEAN_FREQUENCY_SECONDS, CLEAN_FREQUENCY_SECONDS, TimeUnit.SECONDS);
new CleanupRunnable(cleanerExecutorService, suppliedMaps, finalized),
CLEAN_FREQUENCY_SECONDS,
CLEAN_FREQUENCY_SECONDS,
TimeUnit.SECONDS);
try {
Runtime.getRuntime()
.addShutdownHook(
new Thread() {
@Override
public void run() {
try {
cleanerExecutorService.shutdownNow();
cleanerExecutorService.awaitTermination(
SHUTDOWN_WAIT_SECONDS, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
// Don't bother waiting then...
}
}
});
Runtime.getRuntime().addShutdownHook(new ShutdownCallback(cleanerExecutorService));
} catch (final IllegalStateException ex) {
// The JVM is already shutting down.
}
}
@Override
public void finalize() {
finalized.set(true);
}
@Override
public <K, V> WeakMap<K, V> get() {
final WeakConcurrentMap<K, V> map = new WeakConcurrentMap<>(false);
@ -106,6 +87,57 @@ class WeakMapSuppliers {
return new Adapter<>(map);
}
private static class CleanupRunnable implements Runnable {
private final ScheduledExecutorService executorService;
private final Queue<WeakReference<WeakConcurrentMap>> suppliedMaps;
private final AtomicBoolean finalized;
public CleanupRunnable(
final ScheduledExecutorService executorService,
final Queue<WeakReference<WeakConcurrentMap>> suppliedMaps,
final AtomicBoolean finalized) {
this.executorService = executorService;
this.suppliedMaps = suppliedMaps;
this.finalized = finalized;
}
@Override
public void run() {
for (final Iterator<WeakReference<WeakConcurrentMap>> iterator = suppliedMaps.iterator();
iterator.hasNext(); ) {
final WeakConcurrentMap map = iterator.next().get();
if (map == null) {
iterator.remove();
} else {
map.expungeStaleEntries();
}
}
if (finalized.get() && suppliedMaps.isEmpty()) {
executorService.shutdown();
}
}
}
private static final class ShutdownCallback extends Thread {
private final ScheduledExecutorService executorService;
public ShutdownCallback(final ScheduledExecutorService executorService) {
this.executorService = executorService;
}
@Override
public void run() {
try {
executorService.shutdownNow();
executorService.awaitTermination(SHUTDOWN_WAIT_SECONDS, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
// Don't bother waiting then...
}
}
}
private static class Adapter<K, V> implements WeakMap<K, V> {
private final WeakConcurrentMap<K, V> map;
@ -150,7 +182,7 @@ class WeakMapSuppliers {
return new WeakMap.MapAdapter<>(new MapMaker().weakKeys().<K, V>makeMap());
}
public <K, V> WeakMap<K, V> get(int concurrencyLevel) {
public <K, V> WeakMap<K, V> get(final int concurrencyLevel) {
return new WeakMap.MapAdapter<>(
new MapMaker().concurrencyLevel(concurrencyLevel).weakKeys().<K, V>makeMap());
}

View File

@ -38,14 +38,35 @@ class WeakConcurrentSupplierTest extends Specification {
"Guava" | guavaSupplier
}
def "Unreferenced map get cleaned up on #name"() {
def "Unreferenced supplier gets cleaned up on #name"() {
setup:
// Note: we use 'double supplier' here because Groovy keeps reference to test data preventing it from being GCed
def supplier = supplierSupplier()
def ref = new WeakReference(supplier)
when:
def supplierRef = new WeakReference(supplier)
supplier = null
TestUtils.awaitGC(supplierRef)
then:
ref.get() == null
where:
name | supplierSupplier
"WeakConcurrent" | { -> new WeakMapSuppliers.WeakConcurrent() }
"WeakInline" | { -> new WeakMapSuppliers.WeakConcurrent.Inline() }
"Guava" | { -> new WeakMapSuppliers.Guava() }
}
def "Unreferenced map gets cleaned up on #name"() {
setup:
WeakMap.Provider.provider.set(supplier)
def map = WeakMap.Provider.newWeakMap()
def ref = new WeakReference(map)
when:
def mapRef = new WeakReference<>(map)
def mapRef = new WeakReference(map)
map = null
TestUtils.awaitGC(mapRef)
@ -69,7 +90,7 @@ class WeakConcurrentSupplierTest extends Specification {
map.size() == 1
when:
def keyRef = new WeakReference<>(key)
def keyRef = new WeakReference(key)
key = null
TestUtils.awaitGC(keyRef)

View File

@ -1,4 +1,3 @@
plugins {
id 'io.franzbecker.gradle-lombok' version '1.13'
id 'com.jfrog.artifactory' version '4.7.5'

View File

@ -145,6 +145,7 @@ if (project.parent && project.parent.hasProperty("javaExecutableVersionCache"))
} else {
project.ext.javaExecutableVersionCache = [:]
}
def getJavaExecutableVersion(String path) {
def cache = project.ext.javaExecutableVersionCache
@ -177,7 +178,6 @@ def isJavaVersionAllowed(JavaVersion version) {
return true
}
// JVM names we would like to run complete test suite on
// Note: complete test suite is always run on JVM used for compilation
// Note2: apparently there is no way to have a 'global' variable, so instead we have per project