Detect GC leaks of scopes in StrictContextStorage. (#2164)

* Detect GC leaks of scopes in StrictContextStorage.

* More

* Finish

* Force GC more aggressively

* Cleanup

* Vendor code directly

* Copy test too

* Try waiting more

* ep

* oops

* Remove from build.gradle

* Drift

* Log on multiple

* Cleaner ourselves.

* EP

* Move into if

* Revert accidental
This commit is contained in:
Anuraag Agrawal 2020-12-04 13:04:27 +09:00 committed by GitHub
parent 643b697106
commit 8697de9afa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 949 additions and 98 deletions

View File

@ -19,7 +19,3 @@ dependencies {
testImplementation libraries.jqf,
libraries.guava_testlib
}
javadoc {
exclude 'io/opentelemetry/internal/**'
}

View File

@ -200,6 +200,10 @@ configure(opentelemetryProjects) {
withSourcesJar()
}
javadoc {
exclude 'io/opentelemetry/internal/**'
}
tasks {
def testJava8 = register('testJava8', Test) {
javaLauncher = javaToolchains.launcherFor {

View File

@ -0,0 +1,374 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
// Includes work from:
/*
* Copyright Rafael Winterhalter
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Suppress warnings since this is vendored as-is.
// CHECKSTYLE:OFF
package io.opentelemetry.context.internal.shaded;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* A thread-safe map with weak keys. Entries are based on a key's system hash code and keys are
* considered equal only by reference equality. This class offers an abstract-base implementation
* that allows to override methods. This class does not implement the {@link Map} interface because
* this implementation is incompatible with the map contract. While iterating over a map's entries,
* any key that has not passed iteration is referenced non-weakly.
*
* <p>This class has been copied as is from
* https://github.com/raphw/weak-lock-free/blob/ad0e5e0c04d4a31f9485bf12b89afbc9d75473b3/src/main/java/com/blogspot/mydailyjava/weaklockfree/WeakConcurrentMap.java
* This is used in multiple artifacts in OpenTelemetry and while it is in our internal API,
* generally backwards compatible changes should not be made to avoid a situation where different
* versions of OpenTelemetry artifacts become incompatible with each other.
*/
// Suppress warnings since this is vendored as-is.
@SuppressWarnings({"MissingSummary", "EqualsBrokenForNull", "FieldMissingNullable"})
public abstract class AbstractWeakConcurrentMap<K, V, L> extends ReferenceQueue<K>
implements Runnable, Iterable<Map.Entry<K, V>> {
final ConcurrentMap<WeakKey<K>, V> target;
protected AbstractWeakConcurrentMap() {
this(new ConcurrentHashMap<WeakKey<K>, V>());
}
/** @param target ConcurrentMap implementation that this class wraps. */
protected AbstractWeakConcurrentMap(ConcurrentMap<WeakKey<K>, V> target) {
this.target = target;
}
/**
* Override with care as it can cause lookup failures if done incorrectly. The result must have
* the same {@link Object#hashCode()} as the input and be {@link Object#equals(Object) equal to} a
* weak reference of the key. When overriding this, also override {@link #resetLookupKey}.
*/
protected abstract L getLookupKey(K key);
/** Resets any reusable state in the {@linkplain #getLookupKey lookup key}. */
protected abstract void resetLookupKey(L lookupKey);
/**
* @param key The key of the entry.
* @return The value of the entry or the default value if it did not exist.
*/
public V get(K key) {
if (key == null) throw new NullPointerException();
V value;
L lookupKey = getLookupKey(key);
try {
value = target.get(lookupKey);
} finally {
resetLookupKey(lookupKey);
}
if (value == null) {
value = defaultValue(key);
if (value != null) {
V previousValue = target.putIfAbsent(new WeakKey<K>(key, this), value);
if (previousValue != null) {
value = previousValue;
}
}
}
return value;
}
/**
* @param key The key of the entry.
* @return The value of the entry or null if it did not exist.
*/
public V getIfPresent(K key) {
if (key == null) throw new NullPointerException();
L lookupKey = getLookupKey(key);
try {
return target.get(lookupKey);
} finally {
resetLookupKey(lookupKey);
}
}
/**
* @param key The key of the entry.
* @return {@code true} if the key already defines a value.
*/
public boolean containsKey(K key) {
if (key == null) throw new NullPointerException();
L lookupKey = getLookupKey(key);
try {
return target.containsKey(lookupKey);
} finally {
resetLookupKey(lookupKey);
}
}
/**
* @param key The key of the entry.
* @param value The value of the entry.
* @return The previous entry or {@code null} if it does not exist.
*/
public V put(K key, V value) {
if (key == null || value == null) throw new NullPointerException();
return target.put(new WeakKey<K>(key, this), value);
}
/**
* @param key The key of the entry.
* @param value The value of the entry.
* @return The previous entry or {@code null} if it does not exist.
*/
public V putIfAbsent(K key, V value) {
if (key == null || value == null) throw new NullPointerException();
V previous;
L lookupKey = getLookupKey(key);
try {
previous = target.get(lookupKey);
} finally {
resetLookupKey(lookupKey);
}
return previous == null ? target.putIfAbsent(new WeakKey<K>(key, this), value) : previous;
}
/**
* @param key The key of the entry.
* @param value The value of the entry.
* @return The previous entry or {@code null} if it does not exist.
*/
public V putIfProbablyAbsent(K key, V value) {
if (key == null || value == null) throw new NullPointerException();
return target.putIfAbsent(new WeakKey<K>(key, this), value);
}
/**
* @param key The key of the entry.
* @return The removed entry or {@code null} if it does not exist.
*/
public V remove(K key) {
if (key == null) throw new NullPointerException();
L lookupKey = getLookupKey(key);
try {
return target.remove(lookupKey);
} finally {
resetLookupKey(lookupKey);
}
}
/** Clears the entire map. */
public void clear() {
target.clear();
}
/**
* Creates a default value. There is no guarantee that the requested value will be set as a once
* it is created in case that another thread requests a value for a key concurrently.
*
* @param key The key for which to create a default value.
* @return The default value for a key without value or {@code null} for not defining a default
* value.
*/
protected V defaultValue(K key) {
return null;
}
/** Cleans all unused references. */
public void expungeStaleEntries() {
Reference<?> reference;
while ((reference = poll()) != null) {
target.remove(reference);
}
}
/**
* Returns the approximate size of this map where the returned number is at least as big as the
* actual number of entries.
*
* @return The minimum size of this map.
*/
public int approximateSize() {
return target.size();
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
target.remove(remove());
}
} catch (InterruptedException ignored) {
// do nothing
}
}
@Override
public Iterator<Map.Entry<K, V>> iterator() {
return new EntryIterator(target.entrySet().iterator());
}
@Override
public String toString() {
return target.toString();
}
/*
* Why this works:
* ---------------
*
* Note that this map only supports reference equality for keys and uses system hash codes. Also, for the
* WeakKey instances to function correctly, we are voluntarily breaking the Java API contract for
* hashCode/equals of these instances.
*
* System hash codes are immutable and can therefore be computed prematurely and are stored explicitly
* within the WeakKey instances. This way, we always know the correct hash code of a key and always
* end up in the correct bucket of our target map. This remains true even after the weakly referenced
* key is collected.
*
* If we are looking up the value of the current key via WeakConcurrentMap::get or any other public
* API method, we know that any value associated with this key must still be in the map as the mere
* existence of this key makes it ineligible for garbage collection. Therefore, looking up a value
* using another WeakKey wrapper guarantees a correct result.
*
* If we are looking up the map entry of a WeakKey after polling it from the reference queue, we know
* that the actual key was already collected and calling WeakKey::get returns null for both the polled
* instance and the instance within the map. Since we explicitly stored the identity hash code for the
* referenced value, it is however trivial to identify the correct bucket. From this bucket, the first
* weak key with a null reference is removed. Due to hash collision, we do not know if this entry
* represents the weak key. However, we do know that the reference queue polls at least as many weak
* keys as there are stale map entries within the target map. If no key is ever removed from the map
* explicitly, the reference queue eventually polls exactly as many weak keys as there are stale entries.
*
* Therefore, we can guarantee that there is no memory leak.
*
* It is the responsibility of the actual map implementation to implement a lookup key that is used for
* lookups. The lookup key must supply the same semantics as the weak key with regards to hash code.
* The weak key invokes the latent key's equality method upon evaluation.
*/
public static final class WeakKey<K> extends WeakReference<K> {
private final int hashCode;
WeakKey(K key, ReferenceQueue<? super K> queue) {
super(key, queue);
hashCode = System.identityHashCode(key);
}
@Override
public int hashCode() {
return hashCode;
}
@Override
public boolean equals(Object other) {
if (other instanceof WeakKey<?>) {
return ((WeakKey<?>) other).get() == get();
} else {
return other.equals(this);
}
}
@Override
public String toString() {
return String.valueOf(get());
}
}
private class EntryIterator implements Iterator<Map.Entry<K, V>> {
private final Iterator<Map.Entry<WeakKey<K>, V>> iterator;
private Map.Entry<WeakKey<K>, V> nextEntry;
private K nextKey;
private EntryIterator(Iterator<Map.Entry<WeakKey<K>, V>> iterator) {
this.iterator = iterator;
findNext();
}
private void findNext() {
while (iterator.hasNext()) {
nextEntry = iterator.next();
nextKey = nextEntry.getKey().get();
if (nextKey != null) {
return;
}
}
nextEntry = null;
nextKey = null;
}
@Override
public boolean hasNext() {
return nextKey != null;
}
@Override
public Map.Entry<K, V> next() {
if (nextKey == null) {
throw new NoSuchElementException();
}
try {
return new SimpleEntry(nextKey, nextEntry);
} finally {
findNext();
}
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
private class SimpleEntry implements Map.Entry<K, V> {
private final K key;
final Map.Entry<WeakKey<K>, V> entry;
private SimpleEntry(K key, Map.Entry<WeakKey<K>, V> entry) {
this.key = key;
this.entry = entry;
}
@Override
public K getKey() {
return key;
}
@Override
public V getValue() {
return entry.getValue();
}
@Override
public V setValue(V value) {
if (value == null) throw new NullPointerException();
return entry.setValue(value);
}
}
}

View File

@ -0,0 +1,241 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
// Includes work from:
/*
* Copyright Rafael Winterhalter
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Suppress warnings since this is vendored as-is.
// CHECKSTYLE:OFF
package io.opentelemetry.context.internal.shaded;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
/**
* A thread-safe map with weak keys. Entries are based on a key's system hash code and keys are
* considered equal only by reference equality. This class does not implement the {@link
* java.util.Map} interface because this implementation is incompatible with the map contract. While
* iterating over a map's entries, any key that has not passed iteration is referenced non-weakly.
*
* <p>This class has been copied as is from
* https://github.com/raphw/weak-lock-free/blob/ad0e5e0c04d4a31f9485bf12b89afbc9d75473b3/src/main/java/com/blogspot/mydailyjava/weaklockfree/WeakConcurrentMap.java
* This is used in multiple artifacts in OpenTelemetry and while it is in our internal API,
* generally backwards compatible changes should not be made to avoid a situation where different
* versions of OpenTelemetry artifacts become incompatible with each other.
*/
// Suppress warnings since this is copied as-is.
@SuppressWarnings({
"MissingSummary",
"UngroupedOverloads",
"ThreadPriorityCheck",
"FieldMissingNullable"
})
public class WeakConcurrentMap<K, V>
extends AbstractWeakConcurrentMap<K, V, WeakConcurrentMap.LookupKey<K>> {
/**
* Lookup keys are cached thread-locally to avoid allocations on lookups. This is beneficial as
* the JIT unfortunately can't reliably replace the {@link LookupKey} allocation with stack
* allocations, even though the {@link LookupKey} does not escape.
*/
private static final ThreadLocal<LookupKey<?>> LOOKUP_KEY_CACHE =
new ThreadLocal<LookupKey<?>>() {
@Override
protected LookupKey<?> initialValue() {
return new LookupKey<Object>();
}
};
private static final AtomicLong ID = new AtomicLong();
private final Thread thread;
private final boolean reuseKeys;
/** @param cleanerThread {@code true} if a thread should be started that removes stale entries. */
public WeakConcurrentMap(boolean cleanerThread) {
this(cleanerThread, isPersistentClassLoader(LookupKey.class.getClassLoader()));
}
/**
* Checks whether the provided {@link ClassLoader} may be unloaded like a web application class
* loader, for example.
*
* <p>If the class loader can't be unloaded, it is safe to use {@link ThreadLocal}s and to reuse
* the {@link LookupKey}. Otherwise, the use of {@link ThreadLocal}s may lead to class loader
* leaks as it prevents the class loader this class is loaded by to unload.
*
* @param classLoader The class loader to check.
* @return {@code true} if the provided class loader can be unloaded.
*/
private static boolean isPersistentClassLoader(ClassLoader classLoader) {
try {
return classLoader == null // bootstrap class loader
|| classLoader == ClassLoader.getSystemClassLoader()
|| classLoader
== ClassLoader.getSystemClassLoader().getParent(); // ext/platfrom class loader;
} catch (Throwable ignored) {
return false;
}
}
/**
* @param cleanerThread {@code true} if a thread should be started that removes stale entries.
* @param reuseKeys {@code true} if the lookup keys should be reused via a {@link ThreadLocal}.
* Note that setting this to {@code true} may result in class loader leaks. See {@link
* #isPersistentClassLoader(ClassLoader)} for more details.
*/
public WeakConcurrentMap(boolean cleanerThread, boolean reuseKeys) {
this(cleanerThread, reuseKeys, new ConcurrentHashMap<WeakKey<K>, V>());
}
/**
* @param cleanerThread {@code true} if a thread should be started that removes stale entries.
* @param reuseKeys {@code true} if the lookup keys should be reused via a {@link ThreadLocal}.
* Note that setting this to {@code true} may result in class loader leaks. See {@link
* #isPersistentClassLoader(ClassLoader)} for more details.
* @param target ConcurrentMap implementation that this class wraps.
*/
public WeakConcurrentMap(
boolean cleanerThread, boolean reuseKeys, ConcurrentMap<WeakKey<K>, V> target) {
super(target);
this.reuseKeys = reuseKeys;
if (cleanerThread) {
thread = new Thread(this);
thread.setName("weak-ref-cleaner-" + ID.getAndIncrement());
thread.setPriority(Thread.MIN_PRIORITY);
thread.setDaemon(true);
thread.start();
} else {
thread = null;
}
}
@Override
@SuppressWarnings("unchecked")
protected LookupKey<K> getLookupKey(K key) {
LookupKey<K> lookupKey;
if (reuseKeys) {
lookupKey = (LookupKey<K>) LOOKUP_KEY_CACHE.get();
} else {
lookupKey = new LookupKey<K>();
}
return lookupKey.withValue(key);
}
@Override
protected void resetLookupKey(LookupKey<K> lookupKey) {
lookupKey.reset();
}
/** @return The cleaner thread or {@code null} if no such thread was set. */
public Thread getCleanerThread() {
return thread;
}
/*
* A lookup key must only be used for looking up instances within a map. For this to work, it implements an identical contract for
* hash code and equals as the WeakKey implementation. At the same time, the lookup key implementation does not extend WeakReference
* and avoids the overhead that a weak reference implies.
*/
// can't use AutoClosable/try-with-resources as this project still supports Java 6
static final class LookupKey<K> {
private K key;
private int hashCode;
LookupKey<K> withValue(K key) {
this.key = key;
hashCode = System.identityHashCode(key);
return this;
}
/** Failing to reset a lookup key can lead to memory leaks as the key is strongly referenced. */
void reset() {
key = null;
hashCode = 0;
}
@Override
public boolean equals(Object other) {
if (other instanceof WeakConcurrentMap.LookupKey<?>) {
return ((LookupKey<?>) other).key == key;
} else {
return ((WeakKey<?>) other).get() == key;
}
}
@Override
public int hashCode() {
return hashCode;
}
}
/**
* A {@link WeakConcurrentMap} where stale entries are removed as a side effect of interacting
* with this map.
*/
public static class WithInlinedExpunction<K, V> extends WeakConcurrentMap<K, V> {
public WithInlinedExpunction() {
super(false);
}
@Override
public V get(K key) {
expungeStaleEntries();
return super.get(key);
}
@Override
public boolean containsKey(K key) {
expungeStaleEntries();
return super.containsKey(key);
}
@Override
public V put(K key, V value) {
expungeStaleEntries();
return super.put(key, value);
}
@Override
public V remove(K key) {
expungeStaleEntries();
return super.remove(key);
}
@Override
public Iterator<Map.Entry<K, V>> iterator() {
expungeStaleEntries();
return super.iterator();
}
@Override
public int approximateSize() {
expungeStaleEntries();
return super.approximateSize();
}
}
}

View File

@ -0,0 +1,178 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
// Includes work from:
/*
* Copyright Rafael Winterhalter
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Suppress warnings since this is vendored as-is.
// CHECKSTYLE:OFF
package io.opentelemetry.context.internal.shaded;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import java.lang.ref.WeakReference;
import java.util.HashMap;
import java.util.Map;
import org.junit.jupiter.api.Test;
// Suppress warnings since this is copied as-is.
@SuppressWarnings({
"overrides",
"UnusedVariable",
"EqualsHashCode",
"MultiVariableDeclaration",
})
class WeakConcurrentMapTest {
@Test
void testLocalExpunction() throws Exception {
final WeakConcurrentMap.WithInlinedExpunction<Object, Object> map =
new WeakConcurrentMap.WithInlinedExpunction<Object, Object>();
assertThat(map.getCleanerThread(), nullValue(Thread.class));
new MapTestCase(map) {
@Override
protected void triggerClean() {
map.expungeStaleEntries();
}
}.doTest();
}
@Test
void testExternalThread() throws Exception {
WeakConcurrentMap<Object, Object> map = new WeakConcurrentMap<Object, Object>(false);
assertThat(map.getCleanerThread(), nullValue(Thread.class));
Thread thread = new Thread(map);
thread.start();
new MapTestCase(map).doTest();
thread.interrupt();
Thread.sleep(200L);
assertThat(thread.isAlive(), is(false));
}
@Test
void testInternalThread() throws Exception {
WeakConcurrentMap<Object, Object> map = new WeakConcurrentMap<Object, Object>(true);
assertThat(map.getCleanerThread(), not(nullValue(Thread.class)));
new MapTestCase(map).doTest();
map.getCleanerThread().interrupt();
Thread.sleep(200L);
assertThat(map.getCleanerThread().isAlive(), is(false));
}
static class KeyEqualToWeakRefOfItself {
@Override
public boolean equals(Object obj) {
if (obj instanceof WeakReference<?>) {
return equals(((WeakReference<?>) obj).get());
}
return super.equals(obj);
}
}
static class CheapUnloadableWeakConcurrentMap
extends AbstractWeakConcurrentMap<KeyEqualToWeakRefOfItself, Object, Object> {
@Override
protected Object getLookupKey(KeyEqualToWeakRefOfItself key) {
return key;
}
@Override
protected void resetLookupKey(Object lookupKey) {}
}
@Test
void testKeyWithWeakRefEquals() {
CheapUnloadableWeakConcurrentMap map = new CheapUnloadableWeakConcurrentMap();
KeyEqualToWeakRefOfItself key = new KeyEqualToWeakRefOfItself();
Object value = new Object();
map.put(key, value);
assertThat(map.containsKey(key), is(true));
assertThat(map.get(key), is(value));
assertThat(map.putIfAbsent(key, new Object()), is(value));
assertThat(map.remove(key), is(value));
assertThat(map.containsKey(key), is(false));
}
private static class MapTestCase {
private final WeakConcurrentMap<Object, Object> map;
public MapTestCase(WeakConcurrentMap<Object, Object> map) {
this.map = map;
}
void doTest() throws Exception {
Object key1 = new Object(),
value1 = new Object(),
key2 = new Object(),
value2 = new Object(),
key3 = new Object(),
value3 = new Object(),
key4 = new Object(),
value4 = new Object();
map.put(key1, value1);
map.put(key2, value2);
map.put(key3, value3);
map.put(key4, value4);
assertThat(map.get(key1), is(value1));
assertThat(map.get(key2), is(value2));
assertThat(map.get(key3), is(value3));
assertThat(map.get(key4), is(value4));
Map<Object, Object> values = new HashMap<Object, Object>();
values.put(key1, value1);
values.put(key2, value2);
values.put(key3, value3);
values.put(key4, value4);
for (Map.Entry<Object, Object> entry : map) {
assertThat(values.remove(entry.getKey()), is(entry.getValue()));
}
assertThat(values.isEmpty(), is(true));
key1 = key2 = null; // Make eligible for GC
System.gc();
Thread.sleep(200L);
triggerClean();
assertThat(map.get(key3), is(value3));
assertThat(map.getIfPresent(key3), is(value3));
assertThat(map.get(key4), is(value4));
assertThat(map.approximateSize(), is(2));
assertThat(map.target.size(), is(2));
assertThat(map.remove(key3), is(value3));
assertThat(map.get(key3), nullValue());
assertThat(map.getIfPresent(key3), nullValue());
assertThat(map.get(key4), is(value4));
assertThat(map.approximateSize(), is(1));
assertThat(map.target.size(), is(1));
map.clear();
assertThat(map.get(key3), nullValue());
assertThat(map.get(key4), nullValue());
assertThat(map.approximateSize(), is(0));
assertThat(map.target.size(), is(0));
assertThat(map.iterator().hasNext(), is(false));
}
protected void triggerClean() {}
}
}

View File

@ -1,5 +1,5 @@
plugins {
id 'java'
id "java-library"
}
description = 'OpenTelemetry SDK Extension JFR'
@ -8,7 +8,6 @@ ext.moduleName = 'io.opentelemetry.sdk.extension.jfr'
dependencies {
implementation project(':opentelemetry-api'),
project(':opentelemetry-sdk')
implementation libraries.guava
}
tasks.withType(JavaCompile) {

View File

@ -5,17 +5,13 @@
package io.opentelemetry.sdk.extension.jfr;
import com.google.common.collect.MapMaker;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.internal.shaded.WeakConcurrentMap;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SpanProcessor;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
/**
* Span processor to create new JFR events for the Span as they are started, and commit on end.
@ -26,11 +22,16 @@ import java.util.Set;
*/
public class JfrSpanProcessor implements SpanProcessor {
private volatile Map<SpanContext, SpanEvent> spanEvents =
new MapMaker().concurrencyLevel(16).initialCapacity(128).weakKeys().makeMap();
private final WeakConcurrentMap<SpanContext, SpanEvent> spanEvents =
new WeakConcurrentMap.WithInlinedExpunction<>();
private volatile boolean closed;
@Override
public void onStart(Context parentContext, ReadWriteSpan span) {
if (closed) {
return;
}
if (span.getSpanContext().isValid()) {
SpanEvent event = new SpanEvent(span.toSpanData());
event.begin();
@ -46,7 +47,7 @@ public class JfrSpanProcessor implements SpanProcessor {
@Override
public void onEnd(ReadableSpan rs) {
SpanEvent event = spanEvents.remove(rs.getSpanContext());
if (event != null && event.shouldCommit()) {
if (!closed && event != null && event.shouldCommit()) {
event.commit();
}
}
@ -58,66 +59,7 @@ public class JfrSpanProcessor implements SpanProcessor {
@Override
public CompletableResultCode shutdown() {
spanEvents = new NoopMap<>();
closed = true;
return CompletableResultCode.ofSuccess();
}
private static class NoopMap<K, V> implements Map<K, V> {
@Override
public int size() {
return 0;
}
@Override
public boolean isEmpty() {
return true;
}
@Override
public boolean containsKey(Object key) {
return false;
}
@Override
public boolean containsValue(Object value) {
return false;
}
@Override
public V get(Object key) {
return null;
}
@Override
public V put(K key, V value) {
return null;
}
@Override
public V remove(Object key) {
return null;
}
@Override
public void putAll(Map<? extends K, ? extends V> m) {}
@Override
public void clear() {}
@Override
public Set<K> keySet() {
return Collections.emptySet();
}
@Override
public Collection<V> values() {
return Collections.emptyList();
}
@Override
public Set<Entry<K, V>> entrySet() {
return Collections.emptySet();
}
}
}

View File

@ -25,12 +25,14 @@ import static java.lang.Thread.currentThread;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.ContextStorage;
import io.opentelemetry.context.Scope;
import java.util.ArrayList;
import io.opentelemetry.context.internal.shaded.WeakConcurrentMap;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
/**
* A {@link ContextStorage} which keeps track of opened and closed {@link Scope}s, reporting caller
@ -76,12 +78,15 @@ public class StrictContextStorage implements ContextStorage {
return new StrictContextStorage(delegate);
}
// Visible for testing
static final Logger logger = Logger.getLogger(StrictContextStorage.class.getName());
private final ContextStorage delegate;
private final BlockingQueue<CallerStackTrace> currentCallers;
private final PendingScopes pendingScopes;
private StrictContextStorage(ContextStorage delegate) {
this.delegate = delegate;
currentCallers = new LinkedBlockingDeque<>();
pendingScopes = PendingScopes.create();
}
@Override
@ -112,7 +117,7 @@ public class StrictContextStorage implements ContextStorage {
stackTrace = Arrays.copyOfRange(stackTrace, from, stackTrace.length);
caller.setStackTrace(stackTrace);
return new StrictScope(scope, caller, currentCallers);
return new StrictScope(scope, caller);
}
@Override
@ -132,35 +137,34 @@ public class StrictContextStorage implements ContextStorage {
*/
// AssertionError to ensure test runners render the stack trace
public void ensureAllClosed() {
List<CallerStackTrace> leakedCallers = new ArrayList<>();
currentCallers.drainTo(leakedCallers);
for (CallerStackTrace caller : leakedCallers) {
// Sometimes unit test runners truncate the cause of the exception.
// This flattens the exception as the caller of close() isn't important vs the one that leaked
AssertionError toThrow =
new AssertionError(
"Thread [" + caller.threadName + "] opened a scope of " + caller.context + " here:");
toThrow.setStackTrace(caller.getStackTrace());
throw toThrow;
pendingScopes.expungeStaleEntries();
List<CallerStackTrace> leaked = pendingScopes.drainPendingCallers();
if (!leaked.isEmpty()) {
if (leaked.size() > 1) {
logger.log(Level.SEVERE, "Multiple scopes leaked - first will be thrown as an error.");
for (CallerStackTrace caller : leaked) {
logger.log(Level.SEVERE, "Scope leaked", callerError(caller));
}
}
throw callerError(leaked.get(0));
}
}
private static final class StrictScope implements Scope {
final class StrictScope implements Scope {
final Scope delegate;
final BlockingQueue<CallerStackTrace> currentCallers;
final CallerStackTrace caller;
private StrictScope(
Scope delegate, CallerStackTrace caller, BlockingQueue<CallerStackTrace> currentCallers) {
StrictScope(Scope delegate, CallerStackTrace caller) {
this.delegate = delegate;
this.currentCallers = currentCallers;
this.caller = caller;
this.currentCallers.add(caller);
pendingScopes.put(this, caller);
}
@Override
public void close() {
currentCallers.remove(caller);
caller.closed = true;
pendingScopes.remove(this);
if (currentThread().getId() != caller.threadId) {
throw new IllegalStateException(
String.format(
@ -177,7 +181,7 @@ public class StrictContextStorage implements ContextStorage {
}
}
private static class CallerStackTrace extends Throwable {
static class CallerStackTrace extends Throwable {
private static final long serialVersionUID = 783294061323215387L;
@ -185,9 +189,67 @@ public class StrictContextStorage implements ContextStorage {
final long threadId = currentThread().getId();
final Context context;
volatile boolean closed;
CallerStackTrace(Context context) {
super("Thread [" + currentThread().getName() + "] opened scope for " + context + " here:");
this.context = context;
}
}
static class PendingScopes extends WeakConcurrentMap<Scope, CallerStackTrace> {
static PendingScopes create() {
return new PendingScopes(new ConcurrentHashMap<>());
}
// We need to explicitly pass a map to the constructor because we otherwise cannot remove from
// it. https://github.com/raphw/weak-lock-free/pull/12
private final ConcurrentHashMap<WeakKey<Scope>, CallerStackTrace> map;
@SuppressWarnings("ThreadPriorityCheck")
PendingScopes(ConcurrentHashMap<WeakKey<Scope>, CallerStackTrace> map) {
super(/* cleanerThread= */ false, /* reuseKeys= */ false, map);
this.map = map;
// Start cleaner thread ourselves to make sure it runs after initializing our fields.
Thread thread = new Thread(this);
thread.setName("weak-ref-cleaner-strictcontextstorage");
thread.setPriority(Thread.MIN_PRIORITY);
thread.setDaemon(true);
thread.start();
}
List<CallerStackTrace> drainPendingCallers() {
List<CallerStackTrace> pendingCallers =
map.values().stream().filter(caller -> !caller.closed).collect(Collectors.toList());
map.clear();
return pendingCallers;
}
// Called by cleaner thread.
@Override
public void run() {
try {
while (!Thread.interrupted()) {
CallerStackTrace caller = map.remove(remove());
if (caller != null && !caller.closed) {
logger.log(
Level.SEVERE, "Scope garbage collected before being closed.", callerError(caller));
}
}
} catch (InterruptedException ignored) {
// do nothing
}
}
}
static AssertionError callerError(CallerStackTrace caller) {
// Sometimes unit test runners truncate the cause of the exception.
// This flattens the exception as the caller of close() isn't important vs the one that leaked
AssertionError toThrow =
new AssertionError(
"Thread [" + caller.threadName + "] opened a scope of " + caller.context + " here:");
toThrow.setStackTrace(caller.getStackTrace());
return toThrow;
}
}

View File

@ -22,6 +22,7 @@ package io.opentelemetry.sdk.testing.context;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
@ -32,8 +33,13 @@ import io.opentelemetry.context.ContextKey;
import io.opentelemetry.context.ContextStorage;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.trace.IdGenerator;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
@ -172,4 +178,53 @@ class StrictContextStorageTest {
static void assertStackTraceStartsWithMethod(Throwable throwable, String methodName) {
assertThat(throwable.getStackTrace()[0].getMethodName()).isEqualTo(methodName);
}
@Test
@SuppressWarnings("UnusedVariable")
void multipleLeaks() {
Scope scope1 = Context.current().with(ANIMAL, "cat").makeCurrent();
Scope scope2 = Context.current().with(ANIMAL, "dog").makeCurrent();
assertThatThrownBy(strictStorage::ensureAllClosed).isInstanceOf(AssertionError.class);
}
@Test
void garbageCollectedScope() {
Logger logger = StrictContextStorage.logger;
AtomicReference<LogRecord> logged = new AtomicReference<>();
Handler handler =
new Handler() {
@Override
public void publish(LogRecord record) {
logged.set(record);
}
@Override
public void flush() {}
@Override
public void close() {}
};
logger.addHandler(handler);
logger.setUseParentHandlers(false);
try {
Context.current().with(ANIMAL, "cat").makeCurrent();
await()
.atMost(Duration.ofSeconds(30))
.untilAsserted(
() -> {
System.gc();
assertThat(logged).doesNotHaveValue(null);
LogRecord record = logged.get();
assertThat(record.getLevel()).isEqualTo(Level.SEVERE);
assertThat(record.getMessage())
.isEqualTo("Scope garbage collected before being closed.");
assertThat(record.getThrown().getMessage())
.matches("Thread \\[Test worker\\] opened a scope of .* here:");
});
} finally {
logger.removeHandler(handler);
logger.setUseParentHandlers(true);
}
}
}