PrometheusHttpServer prevent concurrent reads when reusable memory mode (#6371)
This commit is contained in:
		
							parent
							
								
									a5fc312d26
								
							
						
					
					
						commit
						8f791f2c61
					
				|  | @ -23,6 +23,7 @@ import java.io.IOException; | ||||||
| import java.io.UncheckedIOException; | import java.io.UncheckedIOException; | ||||||
| import java.net.InetSocketAddress; | import java.net.InetSocketAddress; | ||||||
| import java.util.concurrent.ExecutorService; | import java.util.concurrent.ExecutorService; | ||||||
|  | import java.util.concurrent.Executors; | ||||||
| import java.util.concurrent.TimeUnit; | import java.util.concurrent.TimeUnit; | ||||||
| import java.util.function.Predicate; | import java.util.function.Predicate; | ||||||
| import javax.annotation.Nullable; | import javax.annotation.Nullable; | ||||||
|  | @ -70,6 +71,12 @@ public final class PrometheusHttpServer implements MetricReader { | ||||||
|     this.memoryMode = memoryMode; |     this.memoryMode = memoryMode; | ||||||
|     this.prometheusRegistry = prometheusRegistry; |     this.prometheusRegistry = prometheusRegistry; | ||||||
|     prometheusRegistry.register(prometheusMetricReader); |     prometheusRegistry.register(prometheusMetricReader); | ||||||
|  |     // When memory mode is REUSABLE_DATA, concurrent reads lead to data corruption. To prevent this, | ||||||
|  |     // we configure prometheus with a single thread executor such that requests are handled | ||||||
|  |     // sequentially. | ||||||
|  |     if (memoryMode == MemoryMode.REUSABLE_DATA) { | ||||||
|  |       executor = Executors.newSingleThreadExecutor(); | ||||||
|  |     } | ||||||
|     try { |     try { | ||||||
|       this.httpServer = |       this.httpServer = | ||||||
|           HTTPServer.builder() |           HTTPServer.builder() | ||||||
|  |  | ||||||
|  | @ -11,6 +11,7 @@ import static java.util.Objects.requireNonNull; | ||||||
| import io.opentelemetry.sdk.common.export.MemoryMode; | import io.opentelemetry.sdk.common.export.MemoryMode; | ||||||
| import io.prometheus.metrics.model.registry.PrometheusRegistry; | import io.prometheus.metrics.model.registry.PrometheusRegistry; | ||||||
| import java.util.concurrent.ExecutorService; | import java.util.concurrent.ExecutorService; | ||||||
|  | import java.util.concurrent.Executors; | ||||||
| import java.util.function.Predicate; | import java.util.function.Predicate; | ||||||
| import javax.annotation.Nullable; | import javax.annotation.Nullable; | ||||||
| 
 | 
 | ||||||
|  | @ -93,7 +94,13 @@ public final class PrometheusHttpServerBuilder { | ||||||
|     return this; |     return this; | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   /** Set the {@link MemoryMode}. */ |   /** | ||||||
|  |    * Set the {@link MemoryMode}. | ||||||
|  |    * | ||||||
|  |    * <p>If set to {@link MemoryMode#REUSABLE_DATA}, requests are served sequentially which is | ||||||
|  |    * accomplished by overriding {@link #setExecutor(ExecutorService)} to {@link | ||||||
|  |    * Executors#newSingleThreadExecutor()}. | ||||||
|  |    */ | ||||||
|   public PrometheusHttpServerBuilder setMemoryMode(MemoryMode memoryMode) { |   public PrometheusHttpServerBuilder setMemoryMode(MemoryMode memoryMode) { | ||||||
|     requireNonNull(memoryMode, "memoryMode"); |     requireNonNull(memoryMode, "memoryMode"); | ||||||
|     this.memoryMode = memoryMode; |     this.memoryMode = memoryMode; | ||||||
|  |  | ||||||
|  | @ -24,6 +24,7 @@ import io.github.netmikey.logunit.api.LogCapturer; | ||||||
| import io.opentelemetry.api.common.Attributes; | import io.opentelemetry.api.common.Attributes; | ||||||
| import io.opentelemetry.internal.testing.slf4j.SuppressLogger; | import io.opentelemetry.internal.testing.slf4j.SuppressLogger; | ||||||
| import io.opentelemetry.sdk.common.InstrumentationScopeInfo; | import io.opentelemetry.sdk.common.InstrumentationScopeInfo; | ||||||
|  | import io.opentelemetry.sdk.common.export.MemoryMode; | ||||||
| import io.opentelemetry.sdk.metrics.data.AggregationTemporality; | import io.opentelemetry.sdk.metrics.data.AggregationTemporality; | ||||||
| import io.opentelemetry.sdk.metrics.data.MetricData; | import io.opentelemetry.sdk.metrics.data.MetricData; | ||||||
| import io.opentelemetry.sdk.metrics.export.CollectionRegistration; | import io.opentelemetry.sdk.metrics.export.CollectionRegistration; | ||||||
|  | @ -39,6 +40,7 @@ import java.io.ByteArrayInputStream; | ||||||
| import java.io.IOException; | import java.io.IOException; | ||||||
| import java.net.ServerSocket; | import java.net.ServerSocket; | ||||||
| import java.nio.charset.StandardCharsets; | import java.nio.charset.StandardCharsets; | ||||||
|  | import java.util.ArrayList; | ||||||
| import java.util.Collection; | import java.util.Collection; | ||||||
| import java.util.Collections; | import java.util.Collections; | ||||||
| import java.util.List; | import java.util.List; | ||||||
|  | @ -47,6 +49,7 @@ import java.util.concurrent.Executors; | ||||||
| import java.util.concurrent.ScheduledExecutorService; | import java.util.concurrent.ScheduledExecutorService; | ||||||
| import java.util.concurrent.ScheduledThreadPoolExecutor; | import java.util.concurrent.ScheduledThreadPoolExecutor; | ||||||
| import java.util.concurrent.ThreadPoolExecutor; | import java.util.concurrent.ThreadPoolExecutor; | ||||||
|  | import java.util.concurrent.atomic.AtomicBoolean; | ||||||
| import java.util.concurrent.atomic.AtomicReference; | import java.util.concurrent.atomic.AtomicReference; | ||||||
| import java.util.function.Predicate; | import java.util.function.Predicate; | ||||||
| import java.util.zip.GZIPInputStream; | import java.util.zip.GZIPInputStream; | ||||||
|  | @ -129,6 +132,68 @@ class PrometheusHttpServerTest { | ||||||
|                 + "target_info{kr=\"vr\"} 1\n"); |                 + "target_info{kr=\"vr\"} 1\n"); | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|  |   @Test | ||||||
|  |   void fetch_ReusableMemoryMode() throws InterruptedException { | ||||||
|  |     try (PrometheusHttpServer prometheusServer = | ||||||
|  |         PrometheusHttpServer.builder() | ||||||
|  |             .setHost("localhost") | ||||||
|  |             .setPort(0) | ||||||
|  |             .setMemoryMode(MemoryMode.REUSABLE_DATA) | ||||||
|  |             .build()) { | ||||||
|  |       AtomicBoolean collectInProgress = new AtomicBoolean(); | ||||||
|  |       AtomicBoolean concurrentRead = new AtomicBoolean(); | ||||||
|  |       prometheusServer.register( | ||||||
|  |           new CollectionRegistration() { | ||||||
|  |             @Override | ||||||
|  |             public Collection<MetricData> collectAllMetrics() { | ||||||
|  |               if (!collectInProgress.compareAndSet(false, true)) { | ||||||
|  |                 concurrentRead.set(true); | ||||||
|  |               } | ||||||
|  |               Collection<MetricData> response = metricData.get(); | ||||||
|  |               try { | ||||||
|  |                 Thread.sleep(1); | ||||||
|  |               } catch (InterruptedException e) { | ||||||
|  |                 throw new RuntimeException(e); | ||||||
|  |               } | ||||||
|  |               if (!collectInProgress.compareAndSet(true, false)) { | ||||||
|  |                 concurrentRead.set(true); | ||||||
|  |               } | ||||||
|  |               return response; | ||||||
|  |             } | ||||||
|  |           }); | ||||||
|  | 
 | ||||||
|  |       WebClient client = | ||||||
|  |           WebClient.builder("http://localhost:" + prometheusServer.getAddress().getPort()) | ||||||
|  |               .decorator(RetryingClient.newDecorator(RetryRule.failsafe())) | ||||||
|  |               .build(); | ||||||
|  | 
 | ||||||
|  |       // Spin up 4 threads calling /metrics simultaneously. If concurrent read happens, | ||||||
|  |       // collectAllMetrics will set concurrentRead to true and the test will fail. | ||||||
|  |       List<Thread> threads = new ArrayList<>(); | ||||||
|  |       for (int i = 0; i < 4; i++) { | ||||||
|  |         Thread thread = | ||||||
|  |             new Thread( | ||||||
|  |                 () -> { | ||||||
|  |                   for (int j = 0; j < 10; j++) { | ||||||
|  |                     AggregatedHttpResponse response = client.get("/metrics").aggregate().join(); | ||||||
|  |                     assertThat(response.status()).isEqualTo(HttpStatus.OK); | ||||||
|  |                   } | ||||||
|  |                 }); | ||||||
|  |         thread.setDaemon(true); | ||||||
|  |         thread.start(); | ||||||
|  |         threads.add(thread); | ||||||
|  |       } | ||||||
|  | 
 | ||||||
|  |       // Wait for threads to complete | ||||||
|  |       for (Thread thread : threads) { | ||||||
|  |         thread.join(); | ||||||
|  |       } | ||||||
|  | 
 | ||||||
|  |       // Confirm no concurrent reads took place | ||||||
|  |       assertThat(concurrentRead.get()).isFalse(); | ||||||
|  |     } | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|   @Test |   @Test | ||||||
|   void fetchOpenMetrics() { |   void fetchOpenMetrics() { | ||||||
|     AggregatedHttpResponse response = |     AggregatedHttpResponse response = | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue