From 8f791f2c6169571e468a376ee9883729edae8005 Mon Sep 17 00:00:00 2001 From: jack-berg <34418638+jack-berg@users.noreply.github.com> Date: Mon, 22 Apr 2024 09:47:15 -0500 Subject: [PATCH] PrometheusHttpServer prevent concurrent reads when reusable memory mode (#6371) --- .../prometheus/PrometheusHttpServer.java | 7 ++ .../PrometheusHttpServerBuilder.java | 9 ++- .../prometheus/PrometheusHttpServerTest.java | 65 +++++++++++++++++++ 3 files changed, 80 insertions(+), 1 deletion(-) diff --git a/exporters/prometheus/src/main/java/io/opentelemetry/exporter/prometheus/PrometheusHttpServer.java b/exporters/prometheus/src/main/java/io/opentelemetry/exporter/prometheus/PrometheusHttpServer.java index e4a1561285..f815ea0ac5 100644 --- a/exporters/prometheus/src/main/java/io/opentelemetry/exporter/prometheus/PrometheusHttpServer.java +++ b/exporters/prometheus/src/main/java/io/opentelemetry/exporter/prometheus/PrometheusHttpServer.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.net.InetSocketAddress; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import javax.annotation.Nullable; @@ -70,6 +71,12 @@ public final class PrometheusHttpServer implements MetricReader { this.memoryMode = memoryMode; this.prometheusRegistry = prometheusRegistry; prometheusRegistry.register(prometheusMetricReader); + // When memory mode is REUSABLE_DATA, concurrent reads lead to data corruption. To prevent this, + // we configure prometheus with a single thread executor such that requests are handled + // sequentially. + if (memoryMode == MemoryMode.REUSABLE_DATA) { + executor = Executors.newSingleThreadExecutor(); + } try { this.httpServer = HTTPServer.builder() diff --git a/exporters/prometheus/src/main/java/io/opentelemetry/exporter/prometheus/PrometheusHttpServerBuilder.java b/exporters/prometheus/src/main/java/io/opentelemetry/exporter/prometheus/PrometheusHttpServerBuilder.java index 3c20eea3a2..2be7dee621 100644 --- a/exporters/prometheus/src/main/java/io/opentelemetry/exporter/prometheus/PrometheusHttpServerBuilder.java +++ b/exporters/prometheus/src/main/java/io/opentelemetry/exporter/prometheus/PrometheusHttpServerBuilder.java @@ -11,6 +11,7 @@ import static java.util.Objects.requireNonNull; import io.opentelemetry.sdk.common.export.MemoryMode; import io.prometheus.metrics.model.registry.PrometheusRegistry; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.Predicate; import javax.annotation.Nullable; @@ -93,7 +94,13 @@ public final class PrometheusHttpServerBuilder { return this; } - /** Set the {@link MemoryMode}. */ + /** + * Set the {@link MemoryMode}. + * + *
If set to {@link MemoryMode#REUSABLE_DATA}, requests are served sequentially which is
+ * accomplished by overriding {@link #setExecutor(ExecutorService)} to {@link
+ * Executors#newSingleThreadExecutor()}.
+ */
public PrometheusHttpServerBuilder setMemoryMode(MemoryMode memoryMode) {
requireNonNull(memoryMode, "memoryMode");
this.memoryMode = memoryMode;
diff --git a/exporters/prometheus/src/test/java/io/opentelemetry/exporter/prometheus/PrometheusHttpServerTest.java b/exporters/prometheus/src/test/java/io/opentelemetry/exporter/prometheus/PrometheusHttpServerTest.java
index 3fc114c1ed..449cc4c5c4 100644
--- a/exporters/prometheus/src/test/java/io/opentelemetry/exporter/prometheus/PrometheusHttpServerTest.java
+++ b/exporters/prometheus/src/test/java/io/opentelemetry/exporter/prometheus/PrometheusHttpServerTest.java
@@ -24,6 +24,7 @@ import io.github.netmikey.logunit.api.LogCapturer;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
+import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.CollectionRegistration;
@@ -39,6 +40,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.ServerSocket;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -47,6 +49,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.zip.GZIPInputStream;
@@ -129,6 +132,68 @@ class PrometheusHttpServerTest {
+ "target_info{kr=\"vr\"} 1\n");
}
+ @Test
+ void fetch_ReusableMemoryMode() throws InterruptedException {
+ try (PrometheusHttpServer prometheusServer =
+ PrometheusHttpServer.builder()
+ .setHost("localhost")
+ .setPort(0)
+ .setMemoryMode(MemoryMode.REUSABLE_DATA)
+ .build()) {
+ AtomicBoolean collectInProgress = new AtomicBoolean();
+ AtomicBoolean concurrentRead = new AtomicBoolean();
+ prometheusServer.register(
+ new CollectionRegistration() {
+ @Override
+ public Collection