Catch throwable in async callbacks, and propagate if fatal (#3922)

* Catch throwable in async callbacks, and propogate if fatal

* Add unit tests, adjust propagate ordering

* Create util function for propagateIfFatal

* PR feedback
This commit is contained in:
jack-berg 2021-11-23 23:39:18 -06:00 committed by GitHub
parent 12a54710b7
commit 201934aef5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 63 additions and 4 deletions

View File

@ -0,0 +1,30 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.internal;
/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public final class ThrowableUtil {
/**
* Throw the {@link Throwable} if fatal.
*
* <p>Taken from RxJava throwIfFatal, which was taken from scala.
*/
public static void propagateIfFatal(Throwable t) {
if (t instanceof VirtualMachineError) {
throw (VirtualMachineError) t;
} else if (t instanceof ThreadDeath) {
throw (ThreadDeath) t;
} else if (t instanceof LinkageError) {
throw (LinkageError) t;
}
}
private ThrowableUtil() {}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.internal;
import static io.opentelemetry.sdk.internal.ThrowableUtil.propagateIfFatal;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import org.junit.jupiter.api.Test;
class ThrowableUtilTest {
@Test
void propagateIfFatal_Throws() {
assertThatCode(() -> propagateIfFatal(new RuntimeException("Error!")))
.doesNotThrowAnyException();
assertThatCode(() -> propagateIfFatal(new Exception("Error!"))).doesNotThrowAnyException();
assertThatThrownBy(() -> propagateIfFatal(new VirtualMachineError("Error!") {}))
.isInstanceOf(VirtualMachineError.class);
assertThatThrownBy(() -> propagateIfFatal(new ThreadDeath())).isInstanceOf(ThreadDeath.class);
assertThatThrownBy(() -> propagateIfFatal(new LinkageError())).isInstanceOf(LinkageError.class);
}
}

View File

@ -5,6 +5,8 @@
package io.opentelemetry.sdk.metrics.internal.state;
import static io.opentelemetry.sdk.internal.ThrowableUtil.propagateIfFatal;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
@ -145,7 +147,8 @@ public final class AsynchronousMetricStorage<T> implements MetricStorage {
try {
try {
metricUpdater.run();
} catch (RuntimeException e) {
} catch (Throwable e) {
propagateIfFatal(e);
logger.log(
Level.WARNING,
"An exception occurred invoking callback for instrument "

View File

@ -120,7 +120,7 @@ public class AsynchronousMetricStorageTest {
}
@Test
void collectAndReset_CatchesRuntimeException() {
void collectAndReset_CallbackException() {
Mockito.when(reader.getSupportedTemporality())
.thenReturn(EnumSet.allOf(AggregationTemporality.class));
@ -133,8 +133,8 @@ public class AsynchronousMetricStorageTest {
"unit",
InstrumentType.OBSERVABLE_GAUGE,
InstrumentValueType.LONG),
value -> {
throw new IllegalStateException("Error!");
unused -> {
throw new RuntimeException("Error!");
});
assertThat(
metricStorage.collectAndReset(