Merge pull request #436 from DataDog/mar-kolya/use-WeakConcurrentMap-suggestions
WeakMap improvements
This commit is contained in:
commit
e702928e19
|
@ -73,5 +73,10 @@ public interface WeakMap<K, V> {
|
||||||
public void put(final K key, final V value) {
|
public void put(final K key, final V value) {
|
||||||
map.put(key, value);
|
map.put(key, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return map.toString();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package datadog.trace.agent.tooling;
|
package datadog.trace.agent.tooling;
|
||||||
|
|
||||||
import com.blogspot.mydailyjava.weaklockfree.WeakConcurrentMap;
|
import com.blogspot.mydailyjava.weaklockfree.WeakConcurrentMap;
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.collect.MapMaker;
|
import com.google.common.collect.MapMaker;
|
||||||
import datadog.trace.bootstrap.WeakMap;
|
import datadog.trace.bootstrap.WeakMap;
|
||||||
import java.lang.ref.WeakReference;
|
import java.lang.ref.WeakReference;
|
||||||
|
@ -35,9 +36,11 @@ class WeakMapSuppliers {
|
||||||
* second.
|
* second.
|
||||||
*/
|
*/
|
||||||
static class WeakConcurrent implements WeakMap.Supplier {
|
static class WeakConcurrent implements WeakMap.Supplier {
|
||||||
private static final long CLEAN_FREQUENCY_SECONDS = 1;
|
|
||||||
private static final long SHUTDOWN_WAIT_SECONDS = 5;
|
private static final long SHUTDOWN_WAIT_SECONDS = 5;
|
||||||
|
|
||||||
|
@VisibleForTesting static final long CLEAN_FREQUENCY_SECONDS = 1;
|
||||||
|
|
||||||
private static final ThreadFactory THREAD_FACTORY =
|
private static final ThreadFactory THREAD_FACTORY =
|
||||||
new ThreadFactory() {
|
new ThreadFactory() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -49,28 +52,7 @@ class WeakMapSuppliers {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
private static final ScheduledExecutorService CLEANER =
|
private final ScheduledExecutorService cleanerExecutorService;
|
||||||
Executors.newScheduledThreadPool(1, THREAD_FACTORY);
|
|
||||||
|
|
||||||
static {
|
|
||||||
try {
|
|
||||||
Runtime.getRuntime()
|
|
||||||
.addShutdownHook(
|
|
||||||
new Thread() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
CLEANER.shutdownNow();
|
|
||||||
CLEANER.awaitTermination(SHUTDOWN_WAIT_SECONDS, TimeUnit.SECONDS);
|
|
||||||
} catch (final InterruptedException e) {
|
|
||||||
// Don't bother waiting then...
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} catch (final IllegalStateException ex) {
|
|
||||||
// The JVM is already shutting down.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private final Queue<WeakReference<WeakConcurrentMap>> suppliedMaps =
|
private final Queue<WeakReference<WeakConcurrentMap>> suppliedMaps =
|
||||||
new ConcurrentLinkedQueue<>();
|
new ConcurrentLinkedQueue<>();
|
||||||
|
@ -79,24 +61,41 @@ class WeakMapSuppliers {
|
||||||
new Runnable() {
|
new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
cleanMaps();
|
for (final Iterator<WeakReference<WeakConcurrentMap>> iterator =
|
||||||
|
suppliedMaps.iterator();
|
||||||
|
iterator.hasNext(); ) {
|
||||||
|
final WeakConcurrentMap map = iterator.next().get();
|
||||||
|
if (map == null) {
|
||||||
|
iterator.remove();
|
||||||
|
} else {
|
||||||
|
map.expungeStaleEntries();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
WeakConcurrent() {
|
WeakConcurrent() {
|
||||||
CLEANER.scheduleAtFixedRate(
|
cleanerExecutorService = Executors.newScheduledThreadPool(1, THREAD_FACTORY);
|
||||||
|
cleanerExecutorService.scheduleAtFixedRate(
|
||||||
runnable, CLEAN_FREQUENCY_SECONDS, CLEAN_FREQUENCY_SECONDS, TimeUnit.SECONDS);
|
runnable, CLEAN_FREQUENCY_SECONDS, CLEAN_FREQUENCY_SECONDS, TimeUnit.SECONDS);
|
||||||
}
|
|
||||||
|
|
||||||
public void cleanMaps() {
|
try {
|
||||||
for (final Iterator<WeakReference<WeakConcurrentMap>> iterator = suppliedMaps.iterator();
|
Runtime.getRuntime()
|
||||||
iterator.hasNext(); ) {
|
.addShutdownHook(
|
||||||
final WeakConcurrentMap map = iterator.next().get();
|
new Thread() {
|
||||||
if (map == null) {
|
@Override
|
||||||
iterator.remove();
|
public void run() {
|
||||||
} else {
|
try {
|
||||||
map.expungeStaleEntries();
|
cleanerExecutorService.shutdownNow();
|
||||||
}
|
cleanerExecutorService.awaitTermination(
|
||||||
|
SHUTDOWN_WAIT_SECONDS, TimeUnit.SECONDS);
|
||||||
|
} catch (final InterruptedException e) {
|
||||||
|
// Don't bother waiting then...
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (final IllegalStateException ex) {
|
||||||
|
// The JVM is already shutting down.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,5 +149,10 @@ class WeakMapSuppliers {
|
||||||
public <K, V> WeakMap<K, V> get() {
|
public <K, V> WeakMap<K, V> get() {
|
||||||
return new WeakMap.MapAdapter<>(new MapMaker().weakKeys().<K, V>makeMap());
|
return new WeakMap.MapAdapter<>(new MapMaker().weakKeys().<K, V>makeMap());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public <K, V> WeakMap<K, V> get(int concurrencyLevel) {
|
||||||
|
return new WeakMap.MapAdapter<>(
|
||||||
|
new MapMaker().concurrencyLevel(concurrencyLevel).weakKeys().<K, V>makeMap());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,44 +2,105 @@ package datadog.trace.agent.tooling
|
||||||
|
|
||||||
import datadog.trace.agent.test.TestUtils
|
import datadog.trace.agent.test.TestUtils
|
||||||
import datadog.trace.bootstrap.WeakMap
|
import datadog.trace.bootstrap.WeakMap
|
||||||
|
import spock.lang.Shared
|
||||||
import spock.lang.Specification
|
import spock.lang.Specification
|
||||||
|
|
||||||
|
import java.lang.ref.WeakReference
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
class WeakConcurrentSupplierTest extends Specification {
|
class WeakConcurrentSupplierTest extends Specification {
|
||||||
def supplier = new WeakMapSuppliers.WeakConcurrent()
|
@Shared
|
||||||
|
def weakConcurrentSupplier = new WeakMapSuppliers.WeakConcurrent()
|
||||||
|
@Shared
|
||||||
|
def weakInlineSupplier = new WeakMapSuppliers.WeakConcurrent.Inline()
|
||||||
|
@Shared
|
||||||
|
def guavaSupplier = new WeakMapSuppliers.Guava()
|
||||||
|
|
||||||
def setup() {
|
def "Calling newWeakMap on #name creates independent maps"() {
|
||||||
WeakMap.Provider.provider.set(supplier)
|
|
||||||
}
|
|
||||||
|
|
||||||
def "calling new adds to the list"() {
|
|
||||||
when:
|
|
||||||
def map1 = WeakMap.Provider.newWeakMap().map
|
|
||||||
|
|
||||||
then:
|
|
||||||
supplier.suppliedMaps.iterator().next().get() == map1
|
|
||||||
|
|
||||||
when:
|
|
||||||
def map2 = WeakMap.Provider.newWeakMap().map
|
|
||||||
def iterator = supplier.suppliedMaps.iterator()
|
|
||||||
|
|
||||||
then:
|
|
||||||
iterator.next().get() == map1
|
|
||||||
iterator.next().get() == map2
|
|
||||||
}
|
|
||||||
|
|
||||||
def "calling cleanMaps does cleanup"() {
|
|
||||||
setup:
|
setup:
|
||||||
|
WeakMap.Provider.provider.set(supplier)
|
||||||
|
def key = new Object()
|
||||||
|
def map1 = WeakMap.Provider.newWeakMap()
|
||||||
|
def map2 = WeakMap.Provider.newWeakMap()
|
||||||
|
|
||||||
|
when:
|
||||||
|
map1.put(key, "value1")
|
||||||
|
map2.put(key, "value2")
|
||||||
|
|
||||||
|
then:
|
||||||
|
map1.get(key) == "value1"
|
||||||
|
map2.get(key) == "value2"
|
||||||
|
|
||||||
|
where:
|
||||||
|
name | supplier
|
||||||
|
"WeakConcurrent" | weakConcurrentSupplier
|
||||||
|
"WeakInline" | weakInlineSupplier
|
||||||
|
"Guava" | guavaSupplier
|
||||||
|
}
|
||||||
|
|
||||||
|
def "Unreferenced map get cleaned up on #name"() {
|
||||||
|
setup:
|
||||||
|
WeakMap.Provider.provider.set(supplier)
|
||||||
def map = WeakMap.Provider.newWeakMap()
|
def map = WeakMap.Provider.newWeakMap()
|
||||||
map.put(new Object(), "value")
|
def ref = new WeakReference(map)
|
||||||
|
|
||||||
|
when:
|
||||||
|
map = null
|
||||||
|
TestUtils.awaitGC()
|
||||||
|
|
||||||
|
then:
|
||||||
|
ref.get() == null
|
||||||
|
|
||||||
|
where:
|
||||||
|
name | supplier
|
||||||
|
"WeakConcurrent" | weakConcurrentSupplier
|
||||||
|
"WeakInline" | weakInlineSupplier
|
||||||
|
"Guava" | guavaSupplier
|
||||||
|
}
|
||||||
|
|
||||||
|
def "Unreferenced keys get cleaned up on #name"() {
|
||||||
|
setup:
|
||||||
|
def key = new Object()
|
||||||
|
map.put(key, "value")
|
||||||
TestUtils.awaitGC()
|
TestUtils.awaitGC()
|
||||||
|
|
||||||
expect:
|
expect:
|
||||||
map.size() == 1 // This might result in an error if supplier's cleaner thread is activated.
|
map.size() == 1
|
||||||
|
|
||||||
when:
|
when:
|
||||||
supplier.cleanMaps()
|
key = null
|
||||||
|
TestUtils.awaitGC()
|
||||||
|
|
||||||
|
if (name == "WeakConcurrent") {
|
||||||
|
// Sleep enough time for cleanup thread to get scheduled.
|
||||||
|
// But on a very slow box (or high load) scheduling may not be exactly predictable
|
||||||
|
// so we try a few times.
|
||||||
|
int count = 0
|
||||||
|
while (map.size() != 0 && count < 10) {
|
||||||
|
Thread.sleep(TimeUnit.SECONDS.toMillis(WeakMapSuppliers.WeakConcurrent.CLEAN_FREQUENCY_SECONDS))
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Hit map a few times to trigger unreferenced entries cleanup.
|
||||||
|
// Exact number of times that we need to hit map is implementation dependent.
|
||||||
|
// For Guava it i specified in
|
||||||
|
// com.google.common.collect.MapMakerInternalMap.DRAIN_THRESHOLD = 0x3F
|
||||||
|
if (name == "Guava" || name == "WeakInline") {
|
||||||
|
for (int i = 0; i <= 0x3F; i++) {
|
||||||
|
map.get("test")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
then:
|
then:
|
||||||
map.size() == 0
|
map.size() == 0
|
||||||
|
|
||||||
|
where:
|
||||||
|
name | map
|
||||||
|
"WeakConcurrent" | weakConcurrentSupplier.get()
|
||||||
|
"WeakInline" | weakInlineSupplier.get()
|
||||||
|
// Guava's cleanup process depends on concurrency level,
|
||||||
|
// and in order to be able to test it we need to set concurrency to 1
|
||||||
|
"Guava" | guavaSupplier.get(1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue