From fc8cc47f850ab13d5b92e1f2d4b0dbb2435cdf2e Mon Sep 17 00:00:00 2001 From: Nikolay Martynov Date: Thu, 9 Aug 2018 22:58:24 -0700 Subject: [PATCH] WeakMap improvements * Improve tests to not depend on thread schedulting * Test all underlying implementations * Reduce number of static values --- .../java/datadog/trace/bootstrap/WeakMap.java | 5 + .../trace/agent/tooling/WeakMapSuppliers.java | 74 ++++++------ .../tooling/WeakConcurrentSupplierTest.groovy | 111 ++++++++++++++---- 3 files changed, 130 insertions(+), 60 deletions(-) diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/WeakMap.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/WeakMap.java index 070bc4391f..d1fdc6aff8 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/WeakMap.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/WeakMap.java @@ -73,5 +73,10 @@ public interface WeakMap { public void put(final K key, final V value) { map.put(key, value); } + + @Override + public String toString() { + return map.toString(); + } } } diff --git a/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/WeakMapSuppliers.java b/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/WeakMapSuppliers.java index 484adeac75..9f948e2228 100644 --- a/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/WeakMapSuppliers.java +++ b/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/WeakMapSuppliers.java @@ -1,6 +1,7 @@ package datadog.trace.agent.tooling; import com.blogspot.mydailyjava.weaklockfree.WeakConcurrentMap; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.MapMaker; import datadog.trace.bootstrap.WeakMap; import java.lang.ref.WeakReference; @@ -35,9 +36,11 @@ class WeakMapSuppliers { * second. */ static class WeakConcurrent implements WeakMap.Supplier { - private static final long CLEAN_FREQUENCY_SECONDS = 1; + private static final long SHUTDOWN_WAIT_SECONDS = 5; + @VisibleForTesting static final long CLEAN_FREQUENCY_SECONDS = 1; + private static final ThreadFactory THREAD_FACTORY = new ThreadFactory() { @Override @@ -49,28 +52,7 @@ class WeakMapSuppliers { } }; - private static final ScheduledExecutorService CLEANER = - Executors.newScheduledThreadPool(1, THREAD_FACTORY); - - static { - try { - Runtime.getRuntime() - .addShutdownHook( - new Thread() { - @Override - public void run() { - try { - CLEANER.shutdownNow(); - CLEANER.awaitTermination(SHUTDOWN_WAIT_SECONDS, TimeUnit.SECONDS); - } catch (final InterruptedException e) { - // Don't bother waiting then... - } - } - }); - } catch (final IllegalStateException ex) { - // The JVM is already shutting down. - } - } + private final ScheduledExecutorService cleanerExecutorService; private final Queue> suppliedMaps = new ConcurrentLinkedQueue<>(); @@ -79,24 +61,41 @@ class WeakMapSuppliers { new Runnable() { @Override public void run() { - cleanMaps(); + for (final Iterator> iterator = + suppliedMaps.iterator(); + iterator.hasNext(); ) { + final WeakConcurrentMap map = iterator.next().get(); + if (map == null) { + iterator.remove(); + } else { + map.expungeStaleEntries(); + } + } } }; WeakConcurrent() { - CLEANER.scheduleAtFixedRate( + cleanerExecutorService = Executors.newScheduledThreadPool(1, THREAD_FACTORY); + cleanerExecutorService.scheduleAtFixedRate( runnable, CLEAN_FREQUENCY_SECONDS, CLEAN_FREQUENCY_SECONDS, TimeUnit.SECONDS); - } - public void cleanMaps() { - for (final Iterator> iterator = suppliedMaps.iterator(); - iterator.hasNext(); ) { - final WeakConcurrentMap map = iterator.next().get(); - if (map == null) { - iterator.remove(); - } else { - map.expungeStaleEntries(); - } + try { + Runtime.getRuntime() + .addShutdownHook( + new Thread() { + @Override + public void run() { + try { + cleanerExecutorService.shutdownNow(); + cleanerExecutorService.awaitTermination( + SHUTDOWN_WAIT_SECONDS, TimeUnit.SECONDS); + } catch (final InterruptedException e) { + // Don't bother waiting then... + } + } + }); + } catch (final IllegalStateException ex) { + // The JVM is already shutting down. } } @@ -150,5 +149,10 @@ class WeakMapSuppliers { public WeakMap get() { return new WeakMap.MapAdapter<>(new MapMaker().weakKeys().makeMap()); } + + public WeakMap get(int concurrencyLevel) { + return new WeakMap.MapAdapter<>( + new MapMaker().concurrencyLevel(concurrencyLevel).weakKeys().makeMap()); + } } } diff --git a/dd-java-agent/agent-tooling/src/test/groovy/datadog/trace/agent/tooling/WeakConcurrentSupplierTest.groovy b/dd-java-agent/agent-tooling/src/test/groovy/datadog/trace/agent/tooling/WeakConcurrentSupplierTest.groovy index 84ac195f79..a8245c5570 100644 --- a/dd-java-agent/agent-tooling/src/test/groovy/datadog/trace/agent/tooling/WeakConcurrentSupplierTest.groovy +++ b/dd-java-agent/agent-tooling/src/test/groovy/datadog/trace/agent/tooling/WeakConcurrentSupplierTest.groovy @@ -2,44 +2,105 @@ package datadog.trace.agent.tooling import datadog.trace.agent.test.TestUtils import datadog.trace.bootstrap.WeakMap +import spock.lang.Shared import spock.lang.Specification +import java.lang.ref.WeakReference +import java.util.concurrent.TimeUnit + class WeakConcurrentSupplierTest extends Specification { - def supplier = new WeakMapSuppliers.WeakConcurrent() + @Shared + def weakConcurrentSupplier = new WeakMapSuppliers.WeakConcurrent() + @Shared + def weakInlineSupplier = new WeakMapSuppliers.WeakConcurrent.Inline() + @Shared + def guavaSupplier = new WeakMapSuppliers.Guava() - def setup() { - WeakMap.Provider.provider.set(supplier) - } - - def "calling new adds to the list"() { - when: - def map1 = WeakMap.Provider.newWeakMap().map - - then: - supplier.suppliedMaps.iterator().next().get() == map1 - - when: - def map2 = WeakMap.Provider.newWeakMap().map - def iterator = supplier.suppliedMaps.iterator() - - then: - iterator.next().get() == map1 - iterator.next().get() == map2 - } - - def "calling cleanMaps does cleanup"() { + def "Calling newWeakMap on #name creates independent maps"() { setup: + WeakMap.Provider.provider.set(supplier) + def key = new Object() + def map1 = WeakMap.Provider.newWeakMap() + def map2 = WeakMap.Provider.newWeakMap() + + when: + map1.put(key, "value1") + map2.put(key, "value2") + + then: + map1.get(key) == "value1" + map2.get(key) == "value2" + + where: + name | supplier + "WeakConcurrent" | weakConcurrentSupplier + "WeakInline" | weakInlineSupplier + "Guava" | guavaSupplier + } + + def "Unreferenced map get cleaned up on #name"() { + setup: + WeakMap.Provider.provider.set(supplier) def map = WeakMap.Provider.newWeakMap() - map.put(new Object(), "value") + def ref = new WeakReference(map) + + when: + map = null + TestUtils.awaitGC() + + then: + ref.get() == null + + where: + name | supplier + "WeakConcurrent" | weakConcurrentSupplier + "WeakInline" | weakInlineSupplier + "Guava" | guavaSupplier + } + + def "Unreferenced keys get cleaned up on #name"() { + setup: + def key = new Object() + map.put(key, "value") TestUtils.awaitGC() expect: - map.size() == 1 // This might result in an error if supplier's cleaner thread is activated. + map.size() == 1 when: - supplier.cleanMaps() + key = null + TestUtils.awaitGC() + + if (name == "WeakConcurrent") { + // Sleep enough time for cleanup thread to get scheduled. + // But on a very slow box (or high load) scheduling may not be exactly predictable + // so we try a few times. + int count = 0 + while (map.size() != 0 && count < 10) { + Thread.sleep(TimeUnit.SECONDS.toMillis(WeakMapSuppliers.WeakConcurrent.CLEAN_FREQUENCY_SECONDS)) + count++ + } + } + + // Hit map a few times to trigger unreferenced entries cleanup. + // Exact number of times that we need to hit map is implementation dependent. + // For Guava it i specified in + // com.google.common.collect.MapMakerInternalMap.DRAIN_THRESHOLD = 0x3F + if (name == "Guava" || name == "WeakInline") { + for (int i = 0; i <= 0x3F; i++) { + map.get("test") + } + } then: map.size() == 0 + + where: + name | map + "WeakConcurrent" | weakConcurrentSupplier.get() + "WeakInline" | weakInlineSupplier.get() + // Guava's cleanup process depends on concurrency level, + // and in order to be able to test it we need to set concurrency to 1 + "Guava" | guavaSupplier.get(1) } }