From 4bde25f0674ae36f3e954f9a474183cfee07c7ed Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Fri, 17 Nov 2023 10:13:23 +0100 Subject: [PATCH] Jvm stable threads (#9839) --- .../runtimemetrics/java8/Threads.java | 133 ++++++++++++++-- .../java8/ThreadsStableSemconvTest.java | 144 ++++++++++++++++++ 2 files changed, 262 insertions(+), 15 deletions(-) create mode 100644 instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/testStableSemconv/java/io/opentelemetry/instrumentation/runtimemetrics/java8/ThreadsStableSemconvTest.java diff --git a/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/main/java/io/opentelemetry/instrumentation/runtimemetrics/java8/Threads.java b/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/main/java/io/opentelemetry/instrumentation/runtimemetrics/java8/Threads.java index 5cc38d4221..bc9ea0f7cc 100644 --- a/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/main/java/io/opentelemetry/instrumentation/runtimemetrics/java8/Threads.java +++ b/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/main/java/io/opentelemetry/instrumentation/runtimemetrics/java8/Threads.java @@ -5,15 +5,30 @@ package io.opentelemetry.instrumentation.runtimemetrics.java8; +import static io.opentelemetry.api.common.AttributeKey.booleanKey; +import static io.opentelemetry.api.common.AttributeKey.stringKey; +import static java.util.Objects.requireNonNull; + import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import io.opentelemetry.instrumentation.api.internal.SemconvStability; import io.opentelemetry.instrumentation.runtimemetrics.java8.internal.JmxRuntimeMetricsUtil; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.function.Consumer; +import javax.annotation.Nullable; /** * Registers measurements that generate metrics about JVM threads. @@ -30,13 +45,29 @@ import java.util.List; * process.runtime.jvm.threads.count{daemon=true} 2 * process.runtime.jvm.threads.count{daemon=false} 5 * + * + *

In case you enable the preview of stable JVM semantic conventions (e.g. by setting the {@code + * otel.semconv-stability.opt-in} system property to {@code jvm}), the metrics being exported will + * follow the + * most recent JVM semantic conventions. This is how the example above looks when stable JVM + * semconv is enabled: + * + *

+ *   jvm.thread.count{jvm.thread.daemon=true,jvm.thread.state="waiting"} 1
+ *   jvm.thread.count{jvm.thread.daemon=true,jvm.thread.state="runnable"} 2
+ *   jvm.thread.count{jvm.thread.daemon=false,jvm.thread.state="waiting"} 2
+ *   jvm.thread.count{jvm.thread.daemon=false,jvm.thread.state="runnable"} 3
+ * 
*/ public final class Threads { // Visible for testing static final Threads INSTANCE = new Threads(); - static final AttributeKey DAEMON = AttributeKey.booleanKey("daemon"); + static final AttributeKey DAEMON = booleanKey("daemon"); + static final AttributeKey JVM_THREAD_DAEMON = booleanKey("jvm.thread.daemon"); + static final AttributeKey JVM_THREAD_STATE = stringKey("jvm.thread.state"); /** Register observers for java runtime class metrics. */ public static List registerObservers(OpenTelemetry openTelemetry) { @@ -47,22 +78,94 @@ public final class Threads { List registerObservers(OpenTelemetry openTelemetry, ThreadMXBean threadBean) { Meter meter = JmxRuntimeMetricsUtil.getMeter(openTelemetry); List observables = new ArrayList<>(); - observables.add( - meter - .upDownCounterBuilder("process.runtime.jvm.threads.count") - .setDescription("Number of executing threads") - .setUnit("{thread}") - .buildWithCallback( - observableMeasurement -> { - observableMeasurement.record( - threadBean.getDaemonThreadCount(), - Attributes.builder().put(DAEMON, true).build()); - observableMeasurement.record( - threadBean.getThreadCount() - threadBean.getDaemonThreadCount(), - Attributes.builder().put(DAEMON, false).build()); - })); + + if (SemconvStability.emitOldJvmSemconv()) { + observables.add( + meter + .upDownCounterBuilder("process.runtime.jvm.threads.count") + .setDescription("Number of executing threads") + .setUnit("{thread}") + .buildWithCallback( + observableMeasurement -> { + int daemonThreadCount = threadBean.getDaemonThreadCount(); + observableMeasurement.record( + daemonThreadCount, Attributes.builder().put(DAEMON, true).build()); + observableMeasurement.record( + threadBean.getThreadCount() - daemonThreadCount, + Attributes.builder().put(DAEMON, false).build()); + })); + } + + if (SemconvStability.emitStableJvmSemconv()) { + observables.add( + meter + .upDownCounterBuilder("jvm.thread.count") + .setDescription("Number of executing platform threads.") + .setUnit("{thread}") + .buildWithCallback( + isJava9OrNewer() + ? java9AndNewerCallback(threadBean) + : java8Callback(threadBean))); + } + return observables; } + @Nullable private static final MethodHandle THREAD_INFO_IS_DAEMON; + + static { + MethodHandle isDaemon; + try { + isDaemon = + MethodHandles.publicLookup() + .findVirtual(ThreadInfo.class, "isDaemon", MethodType.methodType(boolean.class)); + } catch (NoSuchMethodException | IllegalAccessException e) { + isDaemon = null; + } + THREAD_INFO_IS_DAEMON = isDaemon; + } + + private static boolean isJava9OrNewer() { + return THREAD_INFO_IS_DAEMON != null; + } + + private static Consumer java8Callback(ThreadMXBean threadBean) { + return measurement -> { + int daemonThreadCount = threadBean.getDaemonThreadCount(); + measurement.record( + daemonThreadCount, Attributes.builder().put(JVM_THREAD_DAEMON, true).build()); + measurement.record( + threadBean.getThreadCount() - daemonThreadCount, + Attributes.builder().put(JVM_THREAD_DAEMON, false).build()); + }; + } + + private static Consumer java9AndNewerCallback( + ThreadMXBean threadBean) { + return measurement -> { + Map counts = new HashMap<>(); + long[] threadIds = threadBean.getAllThreadIds(); + for (ThreadInfo threadInfo : threadBean.getThreadInfo(threadIds)) { + if (threadInfo == null) { + continue; + } + Attributes threadAttributes = threadAttributes(threadInfo); + counts.compute(threadAttributes, (k, value) -> value == null ? 1 : value + 1); + } + counts.forEach((threadAttributes, count) -> measurement.record(count, threadAttributes)); + }; + } + + private static Attributes threadAttributes(ThreadInfo threadInfo) { + boolean isDaemon; + try { + isDaemon = (boolean) requireNonNull(THREAD_INFO_IS_DAEMON).invoke(threadInfo); + } catch (Throwable e) { + throw new IllegalStateException("Unexpected error happened during ThreadInfo#isDaemon()", e); + } + String threadState = threadInfo.getThreadState().name().toLowerCase(Locale.ROOT); + return Attributes.of(JVM_THREAD_DAEMON, isDaemon, JVM_THREAD_STATE, threadState); + } + private Threads() {} } diff --git a/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/testStableSemconv/java/io/opentelemetry/instrumentation/runtimemetrics/java8/ThreadsStableSemconvTest.java b/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/testStableSemconv/java/io/opentelemetry/instrumentation/runtimemetrics/java8/ThreadsStableSemconvTest.java new file mode 100644 index 0000000000..8abe98439e --- /dev/null +++ b/instrumentation/runtime-telemetry/runtime-telemetry-java8/library/src/testStableSemconv/java/io/opentelemetry/instrumentation/runtimemetrics/java8/ThreadsStableSemconvTest.java @@ -0,0 +1,144 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.runtimemetrics.java8; + +import static io.opentelemetry.instrumentation.runtimemetrics.java8.ScopeUtil.EXPECTED_SCOPE; +import static io.opentelemetry.instrumentation.runtimemetrics.java8.Threads.JVM_THREAD_DAEMON; +import static io.opentelemetry.instrumentation.runtimemetrics.java8.Threads.JVM_THREAD_STATE; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledForJreRange; +import org.junit.jupiter.api.condition.EnabledOnJre; +import org.junit.jupiter.api.condition.JRE; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.stubbing.Answer; + +@ExtendWith(MockitoExtension.class) +class ThreadsStableSemconvTest { + + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + @RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create(); + + @Mock private ThreadMXBean threadBean; + + @Test + @EnabledOnJre(JRE.JAVA_8) + void registerObservers_Java8() { + when(threadBean.getThreadCount()).thenReturn(7); + when(threadBean.getDaemonThreadCount()).thenReturn(2); + + Threads.INSTANCE + .registerObservers(testing.getOpenTelemetry(), threadBean) + .forEach(cleanup::deferCleanup); + + testing.waitAndAssertMetrics( + "io.opentelemetry.runtime-telemetry-java8", + "jvm.thread.count", + metrics -> + metrics.anySatisfy( + metricData -> + assertThat(metricData) + .hasInstrumentationScope(EXPECTED_SCOPE) + .hasDescription("Number of executing platform threads.") + .hasUnit("{thread}") + .hasLongSumSatisfying( + sum -> + sum.isNotMonotonic() + .hasPointsSatisfying( + point -> + point + .hasValue(2) + .hasAttributesSatisfying( + equalTo(JVM_THREAD_DAEMON, true)), + point -> + point + .hasValue(5) + .hasAttributesSatisfying( + equalTo(JVM_THREAD_DAEMON, false)))))); + } + + @Test + @EnabledForJreRange(min = JRE.JAVA_9) + void registerObservers_Java9AndNewer() { + ThreadInfo threadInfo1 = + mock(ThreadInfo.class, new ThreadInfoAnswer(false, Thread.State.RUNNABLE)); + ThreadInfo threadInfo2 = + mock(ThreadInfo.class, new ThreadInfoAnswer(true, Thread.State.WAITING)); + + long[] threadIds = {12, 32, 42}; + when(threadBean.getAllThreadIds()).thenReturn(threadIds); + when(threadBean.getThreadInfo(threadIds)) + .thenReturn(new ThreadInfo[] {threadInfo1, null, threadInfo2}); + + Threads.INSTANCE + .registerObservers(testing.getOpenTelemetry(), threadBean) + .forEach(cleanup::deferCleanup); + + testing.waitAndAssertMetrics( + "io.opentelemetry.runtime-telemetry-java8", + "jvm.thread.count", + metrics -> + metrics.anySatisfy( + metricData -> + assertThat(metricData) + .hasInstrumentationScope(EXPECTED_SCOPE) + .hasDescription("Number of executing platform threads.") + .hasUnit("{thread}") + .hasLongSumSatisfying( + sum -> + sum.isNotMonotonic() + .hasPointsSatisfying( + point -> + point + .hasValue(1) + .hasAttributesSatisfying( + equalTo(JVM_THREAD_DAEMON, false), + equalTo(JVM_THREAD_STATE, "runnable")), + point -> + point + .hasValue(1) + .hasAttributesSatisfying( + equalTo(JVM_THREAD_DAEMON, true), + equalTo(JVM_THREAD_STATE, "waiting")))))); + } + + static final class ThreadInfoAnswer implements Answer { + + private final boolean isDaemon; + private final Thread.State state; + + ThreadInfoAnswer(boolean isDaemon, Thread.State state) { + this.isDaemon = isDaemon; + this.state = state; + } + + @Override + public Object answer(InvocationOnMock invocation) { + String methodName = invocation.getMethod().getName(); + if (methodName.equals("isDaemon")) { + return isDaemon; + } else if (methodName.equals("getThreadState")) { + return state; + } + return null; + } + } +}