diff --git a/api/build.gradle b/api/build.gradle index a3fb1c09c4..4190232eed 100644 --- a/api/build.gradle +++ b/api/build.gradle @@ -19,7 +19,3 @@ dependencies { testImplementation libraries.jqf, libraries.guava_testlib } - -javadoc { - exclude 'io/opentelemetry/internal/**' -} diff --git a/build.gradle b/build.gradle index 134bb8e11f..d8af128b5a 100644 --- a/build.gradle +++ b/build.gradle @@ -200,6 +200,10 @@ configure(opentelemetryProjects) { withSourcesJar() } + javadoc { + exclude 'io/opentelemetry/internal/**' + } + tasks { def testJava8 = register('testJava8', Test) { javaLauncher = javaToolchains.launcherFor { diff --git a/context/src/main/java/io/opentelemetry/context/internal/shaded/AbstractWeakConcurrentMap.java b/context/src/main/java/io/opentelemetry/context/internal/shaded/AbstractWeakConcurrentMap.java new file mode 100644 index 0000000000..91a99b7ba2 --- /dev/null +++ b/context/src/main/java/io/opentelemetry/context/internal/shaded/AbstractWeakConcurrentMap.java @@ -0,0 +1,374 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +// Includes work from: +/* + * Copyright Rafael Winterhalter + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Suppress warnings since this is vendored as-is. +// CHECKSTYLE:OFF + +package io.opentelemetry.context.internal.shaded; + +import java.lang.ref.Reference; +import java.lang.ref.ReferenceQueue; +import java.lang.ref.WeakReference; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * A thread-safe map with weak keys. Entries are based on a key's system hash code and keys are + * considered equal only by reference equality. This class offers an abstract-base implementation + * that allows to override methods. This class does not implement the {@link Map} interface because + * this implementation is incompatible with the map contract. While iterating over a map's entries, + * any key that has not passed iteration is referenced non-weakly. + * + *

This class has been copied as is from + * https://github.com/raphw/weak-lock-free/blob/ad0e5e0c04d4a31f9485bf12b89afbc9d75473b3/src/main/java/com/blogspot/mydailyjava/weaklockfree/WeakConcurrentMap.java + * This is used in multiple artifacts in OpenTelemetry and while it is in our internal API, + * generally backwards compatible changes should not be made to avoid a situation where different + * versions of OpenTelemetry artifacts become incompatible with each other. + */ +// Suppress warnings since this is vendored as-is. +@SuppressWarnings({"MissingSummary", "EqualsBrokenForNull", "FieldMissingNullable"}) +public abstract class AbstractWeakConcurrentMap extends ReferenceQueue + implements Runnable, Iterable> { + + final ConcurrentMap, V> target; + + protected AbstractWeakConcurrentMap() { + this(new ConcurrentHashMap, V>()); + } + + /** @param target ConcurrentMap implementation that this class wraps. */ + protected AbstractWeakConcurrentMap(ConcurrentMap, V> target) { + this.target = target; + } + + /** + * Override with care as it can cause lookup failures if done incorrectly. The result must have + * the same {@link Object#hashCode()} as the input and be {@link Object#equals(Object) equal to} a + * weak reference of the key. When overriding this, also override {@link #resetLookupKey}. + */ + protected abstract L getLookupKey(K key); + + /** Resets any reusable state in the {@linkplain #getLookupKey lookup key}. */ + protected abstract void resetLookupKey(L lookupKey); + + /** + * @param key The key of the entry. + * @return The value of the entry or the default value if it did not exist. + */ + public V get(K key) { + if (key == null) throw new NullPointerException(); + V value; + L lookupKey = getLookupKey(key); + try { + value = target.get(lookupKey); + } finally { + resetLookupKey(lookupKey); + } + if (value == null) { + value = defaultValue(key); + if (value != null) { + V previousValue = target.putIfAbsent(new WeakKey(key, this), value); + if (previousValue != null) { + value = previousValue; + } + } + } + return value; + } + + /** + * @param key The key of the entry. + * @return The value of the entry or null if it did not exist. + */ + public V getIfPresent(K key) { + if (key == null) throw new NullPointerException(); + L lookupKey = getLookupKey(key); + try { + return target.get(lookupKey); + } finally { + resetLookupKey(lookupKey); + } + } + + /** + * @param key The key of the entry. + * @return {@code true} if the key already defines a value. + */ + public boolean containsKey(K key) { + if (key == null) throw new NullPointerException(); + L lookupKey = getLookupKey(key); + try { + return target.containsKey(lookupKey); + } finally { + resetLookupKey(lookupKey); + } + } + + /** + * @param key The key of the entry. + * @param value The value of the entry. + * @return The previous entry or {@code null} if it does not exist. + */ + public V put(K key, V value) { + if (key == null || value == null) throw new NullPointerException(); + return target.put(new WeakKey(key, this), value); + } + + /** + * @param key The key of the entry. + * @param value The value of the entry. + * @return The previous entry or {@code null} if it does not exist. + */ + public V putIfAbsent(K key, V value) { + if (key == null || value == null) throw new NullPointerException(); + V previous; + L lookupKey = getLookupKey(key); + try { + previous = target.get(lookupKey); + } finally { + resetLookupKey(lookupKey); + } + return previous == null ? target.putIfAbsent(new WeakKey(key, this), value) : previous; + } + + /** + * @param key The key of the entry. + * @param value The value of the entry. + * @return The previous entry or {@code null} if it does not exist. + */ + public V putIfProbablyAbsent(K key, V value) { + if (key == null || value == null) throw new NullPointerException(); + return target.putIfAbsent(new WeakKey(key, this), value); + } + + /** + * @param key The key of the entry. + * @return The removed entry or {@code null} if it does not exist. + */ + public V remove(K key) { + if (key == null) throw new NullPointerException(); + L lookupKey = getLookupKey(key); + try { + return target.remove(lookupKey); + } finally { + resetLookupKey(lookupKey); + } + } + + /** Clears the entire map. */ + public void clear() { + target.clear(); + } + + /** + * Creates a default value. There is no guarantee that the requested value will be set as a once + * it is created in case that another thread requests a value for a key concurrently. + * + * @param key The key for which to create a default value. + * @return The default value for a key without value or {@code null} for not defining a default + * value. + */ + protected V defaultValue(K key) { + return null; + } + + /** Cleans all unused references. */ + public void expungeStaleEntries() { + Reference reference; + while ((reference = poll()) != null) { + target.remove(reference); + } + } + + /** + * Returns the approximate size of this map where the returned number is at least as big as the + * actual number of entries. + * + * @return The minimum size of this map. + */ + public int approximateSize() { + return target.size(); + } + + @Override + public void run() { + try { + while (!Thread.interrupted()) { + target.remove(remove()); + } + } catch (InterruptedException ignored) { + // do nothing + } + } + + @Override + public Iterator> iterator() { + return new EntryIterator(target.entrySet().iterator()); + } + + @Override + public String toString() { + return target.toString(); + } + + /* + * Why this works: + * --------------- + * + * Note that this map only supports reference equality for keys and uses system hash codes. Also, for the + * WeakKey instances to function correctly, we are voluntarily breaking the Java API contract for + * hashCode/equals of these instances. + * + * System hash codes are immutable and can therefore be computed prematurely and are stored explicitly + * within the WeakKey instances. This way, we always know the correct hash code of a key and always + * end up in the correct bucket of our target map. This remains true even after the weakly referenced + * key is collected. + * + * If we are looking up the value of the current key via WeakConcurrentMap::get or any other public + * API method, we know that any value associated with this key must still be in the map as the mere + * existence of this key makes it ineligible for garbage collection. Therefore, looking up a value + * using another WeakKey wrapper guarantees a correct result. + * + * If we are looking up the map entry of a WeakKey after polling it from the reference queue, we know + * that the actual key was already collected and calling WeakKey::get returns null for both the polled + * instance and the instance within the map. Since we explicitly stored the identity hash code for the + * referenced value, it is however trivial to identify the correct bucket. From this bucket, the first + * weak key with a null reference is removed. Due to hash collision, we do not know if this entry + * represents the weak key. However, we do know that the reference queue polls at least as many weak + * keys as there are stale map entries within the target map. If no key is ever removed from the map + * explicitly, the reference queue eventually polls exactly as many weak keys as there are stale entries. + * + * Therefore, we can guarantee that there is no memory leak. + * + * It is the responsibility of the actual map implementation to implement a lookup key that is used for + * lookups. The lookup key must supply the same semantics as the weak key with regards to hash code. + * The weak key invokes the latent key's equality method upon evaluation. + */ + + public static final class WeakKey extends WeakReference { + + private final int hashCode; + + WeakKey(K key, ReferenceQueue queue) { + super(key, queue); + hashCode = System.identityHashCode(key); + } + + @Override + public int hashCode() { + return hashCode; + } + + @Override + public boolean equals(Object other) { + if (other instanceof WeakKey) { + return ((WeakKey) other).get() == get(); + } else { + return other.equals(this); + } + } + + @Override + public String toString() { + return String.valueOf(get()); + } + } + + private class EntryIterator implements Iterator> { + + private final Iterator, V>> iterator; + + private Map.Entry, V> nextEntry; + + private K nextKey; + + private EntryIterator(Iterator, V>> iterator) { + this.iterator = iterator; + findNext(); + } + + private void findNext() { + while (iterator.hasNext()) { + nextEntry = iterator.next(); + nextKey = nextEntry.getKey().get(); + if (nextKey != null) { + return; + } + } + nextEntry = null; + nextKey = null; + } + + @Override + public boolean hasNext() { + return nextKey != null; + } + + @Override + public Map.Entry next() { + if (nextKey == null) { + throw new NoSuchElementException(); + } + try { + return new SimpleEntry(nextKey, nextEntry); + } finally { + findNext(); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + private class SimpleEntry implements Map.Entry { + + private final K key; + + final Map.Entry, V> entry; + + private SimpleEntry(K key, Map.Entry, V> entry) { + this.key = key; + this.entry = entry; + } + + @Override + public K getKey() { + return key; + } + + @Override + public V getValue() { + return entry.getValue(); + } + + @Override + public V setValue(V value) { + if (value == null) throw new NullPointerException(); + return entry.setValue(value); + } + } +} diff --git a/context/src/main/java/io/opentelemetry/context/internal/shaded/WeakConcurrentMap.java b/context/src/main/java/io/opentelemetry/context/internal/shaded/WeakConcurrentMap.java new file mode 100644 index 0000000000..8f3db0f5bd --- /dev/null +++ b/context/src/main/java/io/opentelemetry/context/internal/shaded/WeakConcurrentMap.java @@ -0,0 +1,241 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +// Includes work from: +/* + * Copyright Rafael Winterhalter + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Suppress warnings since this is vendored as-is. +// CHECKSTYLE:OFF + +package io.opentelemetry.context.internal.shaded; + +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A thread-safe map with weak keys. Entries are based on a key's system hash code and keys are + * considered equal only by reference equality. This class does not implement the {@link + * java.util.Map} interface because this implementation is incompatible with the map contract. While + * iterating over a map's entries, any key that has not passed iteration is referenced non-weakly. + * + *

This class has been copied as is from + * https://github.com/raphw/weak-lock-free/blob/ad0e5e0c04d4a31f9485bf12b89afbc9d75473b3/src/main/java/com/blogspot/mydailyjava/weaklockfree/WeakConcurrentMap.java + * This is used in multiple artifacts in OpenTelemetry and while it is in our internal API, + * generally backwards compatible changes should not be made to avoid a situation where different + * versions of OpenTelemetry artifacts become incompatible with each other. + */ +// Suppress warnings since this is copied as-is. +@SuppressWarnings({ + "MissingSummary", + "UngroupedOverloads", + "ThreadPriorityCheck", + "FieldMissingNullable" +}) +public class WeakConcurrentMap + extends AbstractWeakConcurrentMap> { + + /** + * Lookup keys are cached thread-locally to avoid allocations on lookups. This is beneficial as + * the JIT unfortunately can't reliably replace the {@link LookupKey} allocation with stack + * allocations, even though the {@link LookupKey} does not escape. + */ + private static final ThreadLocal> LOOKUP_KEY_CACHE = + new ThreadLocal>() { + @Override + protected LookupKey initialValue() { + return new LookupKey(); + } + }; + + private static final AtomicLong ID = new AtomicLong(); + + private final Thread thread; + + private final boolean reuseKeys; + + /** @param cleanerThread {@code true} if a thread should be started that removes stale entries. */ + public WeakConcurrentMap(boolean cleanerThread) { + this(cleanerThread, isPersistentClassLoader(LookupKey.class.getClassLoader())); + } + + /** + * Checks whether the provided {@link ClassLoader} may be unloaded like a web application class + * loader, for example. + * + *

If the class loader can't be unloaded, it is safe to use {@link ThreadLocal}s and to reuse + * the {@link LookupKey}. Otherwise, the use of {@link ThreadLocal}s may lead to class loader + * leaks as it prevents the class loader this class is loaded by to unload. + * + * @param classLoader The class loader to check. + * @return {@code true} if the provided class loader can be unloaded. + */ + private static boolean isPersistentClassLoader(ClassLoader classLoader) { + try { + return classLoader == null // bootstrap class loader + || classLoader == ClassLoader.getSystemClassLoader() + || classLoader + == ClassLoader.getSystemClassLoader().getParent(); // ext/platfrom class loader; + } catch (Throwable ignored) { + return false; + } + } + + /** + * @param cleanerThread {@code true} if a thread should be started that removes stale entries. + * @param reuseKeys {@code true} if the lookup keys should be reused via a {@link ThreadLocal}. + * Note that setting this to {@code true} may result in class loader leaks. See {@link + * #isPersistentClassLoader(ClassLoader)} for more details. + */ + public WeakConcurrentMap(boolean cleanerThread, boolean reuseKeys) { + this(cleanerThread, reuseKeys, new ConcurrentHashMap, V>()); + } + + /** + * @param cleanerThread {@code true} if a thread should be started that removes stale entries. + * @param reuseKeys {@code true} if the lookup keys should be reused via a {@link ThreadLocal}. + * Note that setting this to {@code true} may result in class loader leaks. See {@link + * #isPersistentClassLoader(ClassLoader)} for more details. + * @param target ConcurrentMap implementation that this class wraps. + */ + public WeakConcurrentMap( + boolean cleanerThread, boolean reuseKeys, ConcurrentMap, V> target) { + super(target); + this.reuseKeys = reuseKeys; + if (cleanerThread) { + thread = new Thread(this); + thread.setName("weak-ref-cleaner-" + ID.getAndIncrement()); + thread.setPriority(Thread.MIN_PRIORITY); + thread.setDaemon(true); + thread.start(); + } else { + thread = null; + } + } + + @Override + @SuppressWarnings("unchecked") + protected LookupKey getLookupKey(K key) { + LookupKey lookupKey; + if (reuseKeys) { + lookupKey = (LookupKey) LOOKUP_KEY_CACHE.get(); + } else { + lookupKey = new LookupKey(); + } + return lookupKey.withValue(key); + } + + @Override + protected void resetLookupKey(LookupKey lookupKey) { + lookupKey.reset(); + } + + /** @return The cleaner thread or {@code null} if no such thread was set. */ + public Thread getCleanerThread() { + return thread; + } + + /* + * A lookup key must only be used for looking up instances within a map. For this to work, it implements an identical contract for + * hash code and equals as the WeakKey implementation. At the same time, the lookup key implementation does not extend WeakReference + * and avoids the overhead that a weak reference implies. + */ + + // can't use AutoClosable/try-with-resources as this project still supports Java 6 + static final class LookupKey { + + private K key; + private int hashCode; + + LookupKey withValue(K key) { + this.key = key; + hashCode = System.identityHashCode(key); + return this; + } + + /** Failing to reset a lookup key can lead to memory leaks as the key is strongly referenced. */ + void reset() { + key = null; + hashCode = 0; + } + + @Override + public boolean equals(Object other) { + if (other instanceof WeakConcurrentMap.LookupKey) { + return ((LookupKey) other).key == key; + } else { + return ((WeakKey) other).get() == key; + } + } + + @Override + public int hashCode() { + return hashCode; + } + } + + /** + * A {@link WeakConcurrentMap} where stale entries are removed as a side effect of interacting + * with this map. + */ + public static class WithInlinedExpunction extends WeakConcurrentMap { + + public WithInlinedExpunction() { + super(false); + } + + @Override + public V get(K key) { + expungeStaleEntries(); + return super.get(key); + } + + @Override + public boolean containsKey(K key) { + expungeStaleEntries(); + return super.containsKey(key); + } + + @Override + public V put(K key, V value) { + expungeStaleEntries(); + return super.put(key, value); + } + + @Override + public V remove(K key) { + expungeStaleEntries(); + return super.remove(key); + } + + @Override + public Iterator> iterator() { + expungeStaleEntries(); + return super.iterator(); + } + + @Override + public int approximateSize() { + expungeStaleEntries(); + return super.approximateSize(); + } + } +} diff --git a/context/src/test/java/io/opentelemetry/context/internal/shaded/WeakConcurrentMapTest.java b/context/src/test/java/io/opentelemetry/context/internal/shaded/WeakConcurrentMapTest.java new file mode 100644 index 0000000000..3d635be779 --- /dev/null +++ b/context/src/test/java/io/opentelemetry/context/internal/shaded/WeakConcurrentMapTest.java @@ -0,0 +1,178 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +// Includes work from: +/* + * Copyright Rafael Winterhalter + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Suppress warnings since this is vendored as-is. +// CHECKSTYLE:OFF + +package io.opentelemetry.context.internal.shaded; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.lang.ref.WeakReference; +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.Test; + +// Suppress warnings since this is copied as-is. +@SuppressWarnings({ + "overrides", + "UnusedVariable", + "EqualsHashCode", + "MultiVariableDeclaration", +}) +class WeakConcurrentMapTest { + + @Test + void testLocalExpunction() throws Exception { + final WeakConcurrentMap.WithInlinedExpunction map = + new WeakConcurrentMap.WithInlinedExpunction(); + assertThat(map.getCleanerThread(), nullValue(Thread.class)); + new MapTestCase(map) { + @Override + protected void triggerClean() { + map.expungeStaleEntries(); + } + }.doTest(); + } + + @Test + void testExternalThread() throws Exception { + WeakConcurrentMap map = new WeakConcurrentMap(false); + assertThat(map.getCleanerThread(), nullValue(Thread.class)); + Thread thread = new Thread(map); + thread.start(); + new MapTestCase(map).doTest(); + thread.interrupt(); + Thread.sleep(200L); + assertThat(thread.isAlive(), is(false)); + } + + @Test + void testInternalThread() throws Exception { + WeakConcurrentMap map = new WeakConcurrentMap(true); + assertThat(map.getCleanerThread(), not(nullValue(Thread.class))); + new MapTestCase(map).doTest(); + map.getCleanerThread().interrupt(); + Thread.sleep(200L); + assertThat(map.getCleanerThread().isAlive(), is(false)); + } + + static class KeyEqualToWeakRefOfItself { + + @Override + public boolean equals(Object obj) { + if (obj instanceof WeakReference) { + return equals(((WeakReference) obj).get()); + } + return super.equals(obj); + } + } + + static class CheapUnloadableWeakConcurrentMap + extends AbstractWeakConcurrentMap { + + @Override + protected Object getLookupKey(KeyEqualToWeakRefOfItself key) { + return key; + } + + @Override + protected void resetLookupKey(Object lookupKey) {} + } + + @Test + void testKeyWithWeakRefEquals() { + CheapUnloadableWeakConcurrentMap map = new CheapUnloadableWeakConcurrentMap(); + + KeyEqualToWeakRefOfItself key = new KeyEqualToWeakRefOfItself(); + Object value = new Object(); + map.put(key, value); + assertThat(map.containsKey(key), is(true)); + assertThat(map.get(key), is(value)); + assertThat(map.putIfAbsent(key, new Object()), is(value)); + assertThat(map.remove(key), is(value)); + assertThat(map.containsKey(key), is(false)); + } + + private static class MapTestCase { + + private final WeakConcurrentMap map; + + public MapTestCase(WeakConcurrentMap map) { + this.map = map; + } + + void doTest() throws Exception { + Object key1 = new Object(), + value1 = new Object(), + key2 = new Object(), + value2 = new Object(), + key3 = new Object(), + value3 = new Object(), + key4 = new Object(), + value4 = new Object(); + map.put(key1, value1); + map.put(key2, value2); + map.put(key3, value3); + map.put(key4, value4); + assertThat(map.get(key1), is(value1)); + assertThat(map.get(key2), is(value2)); + assertThat(map.get(key3), is(value3)); + assertThat(map.get(key4), is(value4)); + Map values = new HashMap(); + values.put(key1, value1); + values.put(key2, value2); + values.put(key3, value3); + values.put(key4, value4); + for (Map.Entry entry : map) { + assertThat(values.remove(entry.getKey()), is(entry.getValue())); + } + assertThat(values.isEmpty(), is(true)); + key1 = key2 = null; // Make eligible for GC + System.gc(); + Thread.sleep(200L); + triggerClean(); + assertThat(map.get(key3), is(value3)); + assertThat(map.getIfPresent(key3), is(value3)); + assertThat(map.get(key4), is(value4)); + assertThat(map.approximateSize(), is(2)); + assertThat(map.target.size(), is(2)); + assertThat(map.remove(key3), is(value3)); + assertThat(map.get(key3), nullValue()); + assertThat(map.getIfPresent(key3), nullValue()); + assertThat(map.get(key4), is(value4)); + assertThat(map.approximateSize(), is(1)); + assertThat(map.target.size(), is(1)); + map.clear(); + assertThat(map.get(key3), nullValue()); + assertThat(map.get(key4), nullValue()); + assertThat(map.approximateSize(), is(0)); + assertThat(map.target.size(), is(0)); + assertThat(map.iterator().hasNext(), is(false)); + } + + protected void triggerClean() {} + } +} diff --git a/sdk-extensions/jfr-events/build.gradle b/sdk-extensions/jfr-events/build.gradle index e3a455051f..0fef93a068 100644 --- a/sdk-extensions/jfr-events/build.gradle +++ b/sdk-extensions/jfr-events/build.gradle @@ -1,5 +1,5 @@ plugins { - id 'java' + id "java-library" } description = 'OpenTelemetry SDK Extension JFR' @@ -8,7 +8,6 @@ ext.moduleName = 'io.opentelemetry.sdk.extension.jfr' dependencies { implementation project(':opentelemetry-api'), project(':opentelemetry-sdk') - implementation libraries.guava } tasks.withType(JavaCompile) { diff --git a/sdk-extensions/jfr-events/src/main/java/io/opentelemetry/sdk/extension/jfr/JfrSpanProcessor.java b/sdk-extensions/jfr-events/src/main/java/io/opentelemetry/sdk/extension/jfr/JfrSpanProcessor.java index 2588b19f55..b85fbabe38 100644 --- a/sdk-extensions/jfr-events/src/main/java/io/opentelemetry/sdk/extension/jfr/JfrSpanProcessor.java +++ b/sdk-extensions/jfr-events/src/main/java/io/opentelemetry/sdk/extension/jfr/JfrSpanProcessor.java @@ -5,17 +5,13 @@ package io.opentelemetry.sdk.extension.jfr; -import com.google.common.collect.MapMaker; import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.context.Context; +import io.opentelemetry.context.internal.shaded.WeakConcurrentMap; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.ReadWriteSpan; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SpanProcessor; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.Set; /** * Span processor to create new JFR events for the Span as they are started, and commit on end. @@ -26,11 +22,16 @@ import java.util.Set; */ public class JfrSpanProcessor implements SpanProcessor { - private volatile Map spanEvents = - new MapMaker().concurrencyLevel(16).initialCapacity(128).weakKeys().makeMap(); + private final WeakConcurrentMap spanEvents = + new WeakConcurrentMap.WithInlinedExpunction<>(); + + private volatile boolean closed; @Override public void onStart(Context parentContext, ReadWriteSpan span) { + if (closed) { + return; + } if (span.getSpanContext().isValid()) { SpanEvent event = new SpanEvent(span.toSpanData()); event.begin(); @@ -46,7 +47,7 @@ public class JfrSpanProcessor implements SpanProcessor { @Override public void onEnd(ReadableSpan rs) { SpanEvent event = spanEvents.remove(rs.getSpanContext()); - if (event != null && event.shouldCommit()) { + if (!closed && event != null && event.shouldCommit()) { event.commit(); } } @@ -58,66 +59,7 @@ public class JfrSpanProcessor implements SpanProcessor { @Override public CompletableResultCode shutdown() { - spanEvents = new NoopMap<>(); + closed = true; return CompletableResultCode.ofSuccess(); } - - private static class NoopMap implements Map { - - @Override - public int size() { - return 0; - } - - @Override - public boolean isEmpty() { - return true; - } - - @Override - public boolean containsKey(Object key) { - return false; - } - - @Override - public boolean containsValue(Object value) { - return false; - } - - @Override - public V get(Object key) { - return null; - } - - @Override - public V put(K key, V value) { - return null; - } - - @Override - public V remove(Object key) { - return null; - } - - @Override - public void putAll(Map m) {} - - @Override - public void clear() {} - - @Override - public Set keySet() { - return Collections.emptySet(); - } - - @Override - public Collection values() { - return Collections.emptyList(); - } - - @Override - public Set> entrySet() { - return Collections.emptySet(); - } - } } diff --git a/sdk/testing/src/main/java/io/opentelemetry/sdk/testing/context/StrictContextStorage.java b/sdk/testing/src/main/java/io/opentelemetry/sdk/testing/context/StrictContextStorage.java index 1e37d56de0..5052726ebb 100644 --- a/sdk/testing/src/main/java/io/opentelemetry/sdk/testing/context/StrictContextStorage.java +++ b/sdk/testing/src/main/java/io/opentelemetry/sdk/testing/context/StrictContextStorage.java @@ -25,12 +25,14 @@ import static java.lang.Thread.currentThread; import io.opentelemetry.context.Context; import io.opentelemetry.context.ContextStorage; import io.opentelemetry.context.Scope; -import java.util.ArrayList; +import io.opentelemetry.context.internal.shaded.WeakConcurrentMap; import java.util.Arrays; import java.util.List; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; -import java.util.concurrent.LinkedBlockingDeque; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; /** * A {@link ContextStorage} which keeps track of opened and closed {@link Scope}s, reporting caller @@ -76,12 +78,15 @@ public class StrictContextStorage implements ContextStorage { return new StrictContextStorage(delegate); } + // Visible for testing + static final Logger logger = Logger.getLogger(StrictContextStorage.class.getName()); + private final ContextStorage delegate; - private final BlockingQueue currentCallers; + private final PendingScopes pendingScopes; private StrictContextStorage(ContextStorage delegate) { this.delegate = delegate; - currentCallers = new LinkedBlockingDeque<>(); + pendingScopes = PendingScopes.create(); } @Override @@ -112,7 +117,7 @@ public class StrictContextStorage implements ContextStorage { stackTrace = Arrays.copyOfRange(stackTrace, from, stackTrace.length); caller.setStackTrace(stackTrace); - return new StrictScope(scope, caller, currentCallers); + return new StrictScope(scope, caller); } @Override @@ -132,35 +137,34 @@ public class StrictContextStorage implements ContextStorage { */ // AssertionError to ensure test runners render the stack trace public void ensureAllClosed() { - List leakedCallers = new ArrayList<>(); - currentCallers.drainTo(leakedCallers); - for (CallerStackTrace caller : leakedCallers) { - // Sometimes unit test runners truncate the cause of the exception. - // This flattens the exception as the caller of close() isn't important vs the one that leaked - AssertionError toThrow = - new AssertionError( - "Thread [" + caller.threadName + "] opened a scope of " + caller.context + " here:"); - toThrow.setStackTrace(caller.getStackTrace()); - throw toThrow; + pendingScopes.expungeStaleEntries(); + List leaked = pendingScopes.drainPendingCallers(); + if (!leaked.isEmpty()) { + if (leaked.size() > 1) { + logger.log(Level.SEVERE, "Multiple scopes leaked - first will be thrown as an error."); + for (CallerStackTrace caller : leaked) { + logger.log(Level.SEVERE, "Scope leaked", callerError(caller)); + } + } + throw callerError(leaked.get(0)); } } - private static final class StrictScope implements Scope { + final class StrictScope implements Scope { final Scope delegate; - final BlockingQueue currentCallers; final CallerStackTrace caller; - private StrictScope( - Scope delegate, CallerStackTrace caller, BlockingQueue currentCallers) { + StrictScope(Scope delegate, CallerStackTrace caller) { this.delegate = delegate; - this.currentCallers = currentCallers; this.caller = caller; - this.currentCallers.add(caller); + pendingScopes.put(this, caller); } @Override public void close() { - currentCallers.remove(caller); + caller.closed = true; + pendingScopes.remove(this); + if (currentThread().getId() != caller.threadId) { throw new IllegalStateException( String.format( @@ -177,7 +181,7 @@ public class StrictContextStorage implements ContextStorage { } } - private static class CallerStackTrace extends Throwable { + static class CallerStackTrace extends Throwable { private static final long serialVersionUID = 783294061323215387L; @@ -185,9 +189,67 @@ public class StrictContextStorage implements ContextStorage { final long threadId = currentThread().getId(); final Context context; + volatile boolean closed; + CallerStackTrace(Context context) { super("Thread [" + currentThread().getName() + "] opened scope for " + context + " here:"); this.context = context; } } + + static class PendingScopes extends WeakConcurrentMap { + + static PendingScopes create() { + return new PendingScopes(new ConcurrentHashMap<>()); + } + + // We need to explicitly pass a map to the constructor because we otherwise cannot remove from + // it. https://github.com/raphw/weak-lock-free/pull/12 + private final ConcurrentHashMap, CallerStackTrace> map; + + @SuppressWarnings("ThreadPriorityCheck") + PendingScopes(ConcurrentHashMap, CallerStackTrace> map) { + super(/* cleanerThread= */ false, /* reuseKeys= */ false, map); + this.map = map; + // Start cleaner thread ourselves to make sure it runs after initializing our fields. + Thread thread = new Thread(this); + thread.setName("weak-ref-cleaner-strictcontextstorage"); + thread.setPriority(Thread.MIN_PRIORITY); + thread.setDaemon(true); + thread.start(); + } + + List drainPendingCallers() { + List pendingCallers = + map.values().stream().filter(caller -> !caller.closed).collect(Collectors.toList()); + map.clear(); + return pendingCallers; + } + + // Called by cleaner thread. + @Override + public void run() { + try { + while (!Thread.interrupted()) { + CallerStackTrace caller = map.remove(remove()); + if (caller != null && !caller.closed) { + logger.log( + Level.SEVERE, "Scope garbage collected before being closed.", callerError(caller)); + } + } + } catch (InterruptedException ignored) { + // do nothing + } + } + } + + static AssertionError callerError(CallerStackTrace caller) { + // Sometimes unit test runners truncate the cause of the exception. + // This flattens the exception as the caller of close() isn't important vs the one that leaked + AssertionError toThrow = + new AssertionError( + "Thread [" + caller.threadName + "] opened a scope of " + caller.context + " here:"); + toThrow.setStackTrace(caller.getStackTrace()); + return toThrow; + } } diff --git a/sdk/testing/src/test/java/io/opentelemetry/sdk/testing/context/StrictContextStorageTest.java b/sdk/testing/src/test/java/io/opentelemetry/sdk/testing/context/StrictContextStorageTest.java index b6a050eea9..99928f2506 100644 --- a/sdk/testing/src/test/java/io/opentelemetry/sdk/testing/context/StrictContextStorageTest.java +++ b/sdk/testing/src/test/java/io/opentelemetry/sdk/testing/context/StrictContextStorageTest.java @@ -22,6 +22,7 @@ package io.opentelemetry.sdk.testing.context; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.awaitility.Awaitility.await; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanContext; @@ -32,8 +33,13 @@ import io.opentelemetry.context.ContextKey; import io.opentelemetry.context.ContextStorage; import io.opentelemetry.context.Scope; import io.opentelemetry.sdk.trace.IdGenerator; +import java.time.Duration; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import java.util.logging.Handler; +import java.util.logging.Level; +import java.util.logging.LogRecord; +import java.util.logging.Logger; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -172,4 +178,53 @@ class StrictContextStorageTest { static void assertStackTraceStartsWithMethod(Throwable throwable, String methodName) { assertThat(throwable.getStackTrace()[0].getMethodName()).isEqualTo(methodName); } + + @Test + @SuppressWarnings("UnusedVariable") + void multipleLeaks() { + Scope scope1 = Context.current().with(ANIMAL, "cat").makeCurrent(); + Scope scope2 = Context.current().with(ANIMAL, "dog").makeCurrent(); + assertThatThrownBy(strictStorage::ensureAllClosed).isInstanceOf(AssertionError.class); + } + + @Test + void garbageCollectedScope() { + Logger logger = StrictContextStorage.logger; + AtomicReference logged = new AtomicReference<>(); + Handler handler = + new Handler() { + @Override + public void publish(LogRecord record) { + logged.set(record); + } + + @Override + public void flush() {} + + @Override + public void close() {} + }; + logger.addHandler(handler); + logger.setUseParentHandlers(false); + try { + Context.current().with(ANIMAL, "cat").makeCurrent(); + + await() + .atMost(Duration.ofSeconds(30)) + .untilAsserted( + () -> { + System.gc(); + assertThat(logged).doesNotHaveValue(null); + LogRecord record = logged.get(); + assertThat(record.getLevel()).isEqualTo(Level.SEVERE); + assertThat(record.getMessage()) + .isEqualTo("Scope garbage collected before being closed."); + assertThat(record.getThrown().getMessage()) + .matches("Thread \\[Test worker\\] opened a scope of .* here:"); + }); + } finally { + logger.removeHandler(handler); + logger.setUseParentHandlers(true); + } + } }