[prometheus] Fix issue with corrupted buffers when reading both OpenMetrics and plain text formats (#5623)

Co-authored-by: Piotr Kiełkowicz <pkiekowicz@splunk.com>
Co-authored-by: Vishwesh Bankwar <vishweshbankwar@users.noreply.github.com>
Co-authored-by: Mikel Blanchard <mblanchard@macrosssoftware.com>
This commit is contained in:
Robert Coltheart 2024-05-18 05:38:16 +10:00 committed by GitHub
parent b444464d0f
commit 8177a391a6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 209 additions and 88 deletions

View File

@ -2,6 +2,10 @@
## Unreleased
* Fixed an issue with corrupted buffers when reading both OpenMetrics and
plain text formats from Prometheus exporters.
([#5623](https://github.com/open-telemetry/opentelemetry-dotnet/pull/5623))
## 1.8.0-rc.1
Released 2024-Mar-27

View File

@ -57,7 +57,9 @@ internal sealed class PrometheusExporterMiddleware
try
{
if (collectionResponse.View.Count > 0)
var dataView = openMetricsRequested ? collectionResponse.OpenMetricsView : collectionResponse.PlainTextView;
if (dataView.Count > 0)
{
response.StatusCode = 200;
#if NET8_0_OR_GREATER
@ -69,7 +71,7 @@ internal sealed class PrometheusExporterMiddleware
? "application/openmetrics-text; version=1.0.0; charset=utf-8"
: "text/plain; charset=utf-8; version=0.0.4";
await response.Body.WriteAsync(collectionResponse.View.Array, 0, collectionResponse.View.Count).ConfigureAwait(false);
await response.Body.WriteAsync(dataView.Array, 0, dataView.Count).ConfigureAwait(false);
}
else
{

View File

@ -2,6 +2,10 @@
## Unreleased
* Fixed an issue with corrupted buffers when reading both OpenMetrics and
plain text formats from Prometheus exporters.
([#5623](https://github.com/open-telemetry/opentelemetry-dotnet/pull/5623))
## 1.8.0-rc.1
Released 2024-Mar-27

View File

@ -16,11 +16,14 @@ internal sealed class PrometheusCollectionManager
private readonly Dictionary<Metric, PrometheusMetric> metricsCache;
private readonly HashSet<string> scopes;
private int metricsCacheCount;
private byte[] buffer = new byte[85000]; // encourage the object to live in LOH (large object heap)
private byte[] plainTextBuffer = new byte[85000]; // encourage the object to live in LOH (large object heap)
private byte[] openMetricsBuffer = new byte[85000]; // encourage the object to live in LOH (large object heap)
private int targetInfoBufferLength = -1; // zero or positive when target_info has been written for the first time
private ArraySegment<byte> previousPlainTextDataView;
private ArraySegment<byte> previousOpenMetricsDataView;
private int globalLockState;
private ArraySegment<byte> previousDataView;
private DateTime? previousDataViewGeneratedAtUtc;
private DateTime? previousPlainTextDataViewGeneratedAtUtc;
private DateTime? previousOpenMetricsDataViewGeneratedAtUtc;
private int readerCount;
private bool collectionRunning;
private TaskCompletionSource<CollectionResponse> collectionTcs;
@ -44,16 +47,20 @@ internal sealed class PrometheusCollectionManager
// If we are within {ScrapeResponseCacheDurationMilliseconds} of the
// last successful collect, return the previous view.
if (this.previousDataViewGeneratedAtUtc.HasValue
var previousDataViewGeneratedAtUtc = openMetricsRequested
? this.previousOpenMetricsDataViewGeneratedAtUtc
: this.previousPlainTextDataViewGeneratedAtUtc;
if (previousDataViewGeneratedAtUtc.HasValue
&& this.scrapeResponseCacheDurationMilliseconds > 0
&& this.previousDataViewGeneratedAtUtc.Value.AddMilliseconds(this.scrapeResponseCacheDurationMilliseconds) >= DateTime.UtcNow)
&& previousDataViewGeneratedAtUtc.Value.AddMilliseconds(this.scrapeResponseCacheDurationMilliseconds) >= DateTime.UtcNow)
{
Interlocked.Increment(ref this.readerCount);
this.ExitGlobalLock();
#if NET6_0_OR_GREATER
return new ValueTask<CollectionResponse>(new CollectionResponse(this.previousDataView, this.previousDataViewGeneratedAtUtc.Value, fromCache: true));
return new ValueTask<CollectionResponse>(new CollectionResponse(this.previousOpenMetricsDataView, this.previousPlainTextDataView, previousDataViewGeneratedAtUtc.Value, fromCache: true));
#else
return Task.FromResult(new CollectionResponse(this.previousDataView, this.previousDataViewGeneratedAtUtc.Value, fromCache: true));
return Task.FromResult(new CollectionResponse(this.previousOpenMetricsDataView, this.previousPlainTextDataView, previousDataViewGeneratedAtUtc.Value, fromCache: true));
#endif
}
@ -78,7 +85,16 @@ internal sealed class PrometheusCollectionManager
// Start a collection on the current thread.
this.collectionRunning = true;
this.previousDataViewGeneratedAtUtc = null;
if (openMetricsRequested)
{
this.previousOpenMetricsDataViewGeneratedAtUtc = null;
}
else
{
this.previousPlainTextDataViewGeneratedAtUtc = null;
}
Interlocked.Increment(ref this.readerCount);
this.ExitGlobalLock();
@ -86,8 +102,20 @@ internal sealed class PrometheusCollectionManager
var result = this.ExecuteCollect(openMetricsRequested);
if (result)
{
this.previousDataViewGeneratedAtUtc = DateTime.UtcNow;
response = new CollectionResponse(this.previousDataView, this.previousDataViewGeneratedAtUtc.Value, fromCache: false);
if (openMetricsRequested)
{
this.previousOpenMetricsDataViewGeneratedAtUtc = DateTime.UtcNow;
}
else
{
this.previousPlainTextDataViewGeneratedAtUtc = DateTime.UtcNow;
}
previousDataViewGeneratedAtUtc = openMetricsRequested
? this.previousOpenMetricsDataViewGeneratedAtUtc
: this.previousPlainTextDataViewGeneratedAtUtc;
response = new CollectionResponse(this.previousOpenMetricsDataView, this.previousPlainTextDataView, previousDataViewGeneratedAtUtc.Value, fromCache: false);
}
else
{
@ -170,6 +198,7 @@ internal sealed class PrometheusCollectionManager
private ExportResult OnCollect(Batch<Metric> metrics)
{
var cursor = 0;
var buffer = this.exporter.OpenMetricsRequested ? this.openMetricsBuffer : this.plainTextBuffer;
try
{
@ -192,13 +221,13 @@ internal sealed class PrometheusCollectionManager
{
try
{
cursor = PrometheusSerializer.WriteScopeInfo(this.buffer, cursor, metric.MeterName);
cursor = PrometheusSerializer.WriteScopeInfo(buffer, cursor, metric.MeterName);
break;
}
catch (IndexOutOfRangeException)
{
if (!this.IncreaseBufferSize())
if (!this.IncreaseBufferSize(ref buffer))
{
// there are two cases we might run into the following condition:
// 1. we have many metrics to be exported - in this case we probably want
@ -226,7 +255,7 @@ internal sealed class PrometheusCollectionManager
try
{
cursor = PrometheusSerializer.WriteMetric(
this.buffer,
buffer,
cursor,
metric,
this.GetPrometheusMetric(metric),
@ -236,7 +265,7 @@ internal sealed class PrometheusCollectionManager
}
catch (IndexOutOfRangeException)
{
if (!this.IncreaseBufferSize())
if (!this.IncreaseBufferSize(ref buffer))
{
throw;
}
@ -248,24 +277,40 @@ internal sealed class PrometheusCollectionManager
{
try
{
cursor = PrometheusSerializer.WriteEof(this.buffer, cursor);
cursor = PrometheusSerializer.WriteEof(buffer, cursor);
break;
}
catch (IndexOutOfRangeException)
{
if (!this.IncreaseBufferSize())
if (!this.IncreaseBufferSize(ref buffer))
{
throw;
}
}
}
this.previousDataView = new ArraySegment<byte>(this.buffer, 0, cursor);
if (this.exporter.OpenMetricsRequested)
{
this.previousOpenMetricsDataView = new ArraySegment<byte>(this.openMetricsBuffer, 0, cursor);
}
else
{
this.previousPlainTextDataView = new ArraySegment<byte>(this.plainTextBuffer, 0, cursor);
}
return ExportResult.Success;
}
catch (Exception)
{
this.previousDataView = new ArraySegment<byte>(Array.Empty<byte>(), 0, 0);
if (this.exporter.OpenMetricsRequested)
{
this.previousOpenMetricsDataView = new ArraySegment<byte>(Array.Empty<byte>(), 0, 0);
}
else
{
this.previousPlainTextDataView = new ArraySegment<byte>(Array.Empty<byte>(), 0, 0);
}
return ExportResult.Failure;
}
}
@ -278,13 +323,13 @@ internal sealed class PrometheusCollectionManager
{
try
{
this.targetInfoBufferLength = PrometheusSerializer.WriteTargetInfo(this.buffer, 0, this.exporter.Resource);
this.targetInfoBufferLength = PrometheusSerializer.WriteTargetInfo(this.openMetricsBuffer, 0, this.exporter.Resource);
break;
}
catch (IndexOutOfRangeException)
{
if (!this.IncreaseBufferSize())
if (!this.IncreaseBufferSize(ref this.openMetricsBuffer))
{
throw;
}
@ -295,9 +340,9 @@ internal sealed class PrometheusCollectionManager
return this.targetInfoBufferLength;
}
private bool IncreaseBufferSize()
private bool IncreaseBufferSize(ref byte[] buffer)
{
var newBufferSize = this.buffer.Length * 2;
var newBufferSize = buffer.Length * 2;
if (newBufferSize > 100 * 1024 * 1024)
{
@ -305,8 +350,8 @@ internal sealed class PrometheusCollectionManager
}
var newBuffer = new byte[newBufferSize];
this.buffer.CopyTo(newBuffer, 0);
this.buffer = newBuffer;
buffer.CopyTo(newBuffer, 0);
buffer = newBuffer;
return true;
}
@ -331,14 +376,17 @@ internal sealed class PrometheusCollectionManager
public readonly struct CollectionResponse
{
public CollectionResponse(ArraySegment<byte> view, DateTime generatedAtUtc, bool fromCache)
public CollectionResponse(ArraySegment<byte> openMetricsView, ArraySegment<byte> plainTextView, DateTime generatedAtUtc, bool fromCache)
{
this.View = view;
this.OpenMetricsView = openMetricsView;
this.PlainTextView = plainTextView;
this.GeneratedAtUtc = generatedAtUtc;
this.FromCache = fromCache;
}
public ArraySegment<byte> View { get; }
public ArraySegment<byte> OpenMetricsView { get; }
public ArraySegment<byte> PlainTextView { get; }
public DateTime GeneratedAtUtc { get; }

View File

@ -153,7 +153,10 @@ internal sealed class PrometheusHttpListener : IDisposable
try
{
context.Response.Headers.Add("Server", string.Empty);
if (collectionResponse.View.Count > 0)
var dataView = openMetricsRequested ? collectionResponse.OpenMetricsView : collectionResponse.PlainTextView;
if (dataView.Count > 0)
{
context.Response.StatusCode = 200;
context.Response.Headers.Add("Last-Modified", collectionResponse.GeneratedAtUtc.ToString("R"));
@ -161,7 +164,7 @@ internal sealed class PrometheusHttpListener : IDisposable
? "application/openmetrics-text; version=1.0.0; charset=utf-8"
: "text/plain; charset=utf-8; version=0.0.4";
await context.Response.OutputStream.WriteAsync(collectionResponse.View.Array, 0, collectionResponse.View.Count).ConfigureAwait(false);
await context.Response.OutputStream.WriteAsync(dataView.Array, 0, dataView.Count).ConfigureAwait(false);
}
else
{

View File

@ -248,6 +248,46 @@ public sealed class PrometheusExporterMiddlewareTests
acceptHeader: "application/openmetrics-text; version=1.0.0");
}
[Fact]
public async Task PrometheusExporterMiddlewareIntegration_CanServeOpenMetricsAndPlainFormats()
{
using var host = await StartTestHostAsync(
app => app.UseOpenTelemetryPrometheusScrapingEndpoint());
var tags = new KeyValuePair<string, object>[]
{
new KeyValuePair<string, object>("key1", "value1"),
new KeyValuePair<string, object>("key2", "value2"),
};
using var meter = new Meter(MeterName, MeterVersion);
var beginTimestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds();
var counter = meter.CreateCounter<double>("counter_double");
counter.Add(100.18D, tags);
counter.Add(0.99D, tags);
var testCases = new bool[] { true, false, true, true, false };
using var client = host.GetTestClient();
foreach (var testCase in testCases)
{
using var request = new HttpRequestMessage
{
Headers = { { "Accept", testCase ? "application/openmetrics-text" : "text/plain" } },
RequestUri = new Uri("/metrics", UriKind.Relative),
Method = HttpMethod.Get,
};
using var response = await client.SendAsync(request);
var endTimestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds();
await VerifyAsync(beginTimestamp, endTimestamp, response, testCase);
}
await host.StopAsync();
}
private static async Task RunPrometheusExporterMiddlewareIntegrationTest(
string path,
Action<IApplicationBuilder> configure,
@ -260,26 +300,7 @@ public sealed class PrometheusExporterMiddlewareTests
{
var requestOpenMetrics = acceptHeader.StartsWith("application/openmetrics-text");
using var host = await new HostBuilder()
.ConfigureWebHost(webBuilder => webBuilder
.UseTestServer()
.ConfigureServices(services =>
{
if (registerMeterProvider)
{
services.AddOpenTelemetry().WithMetrics(builder => builder
.ConfigureResource(x => x.Clear().AddService("my_service", serviceInstanceId: "id1"))
.AddMeter(MeterName)
.AddPrometheusExporter(o =>
{
configureOptions?.Invoke(o);
}));
}
configureServices?.Invoke(services);
})
.Configure(configure))
.StartAsync();
using var host = await StartTestHostAsync(configure, configureServices, registerMeterProvider, configureOptions);
var tags = new KeyValuePair<string, object>[]
{
@ -311,41 +332,7 @@ public sealed class PrometheusExporterMiddlewareTests
if (!skipMetrics)
{
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
Assert.True(response.Content.Headers.Contains("Last-Modified"));
if (requestOpenMetrics)
{
Assert.Equal("application/openmetrics-text; version=1.0.0; charset=utf-8", response.Content.Headers.ContentType.ToString());
}
else
{
Assert.Equal("text/plain; charset=utf-8; version=0.0.4", response.Content.Headers.ContentType.ToString());
}
string content = await response.Content.ReadAsStringAsync();
string expected = requestOpenMetrics
? "# TYPE target info\n"
+ "# HELP target Target metadata\n"
+ "target_info{service_name='my_service',service_instance_id='id1'} 1\n"
+ "# TYPE otel_scope_info info\n"
+ "# HELP otel_scope_info Scope metadata\n"
+ $"otel_scope_info{{otel_scope_name='{MeterName}'}} 1\n"
+ "# TYPE counter_double_total counter\n"
+ $"counter_double_total{{otel_scope_name='{MeterName}',otel_scope_version='{MeterVersion}',key1='value1',key2='value2'}} 101.17 (\\d+\\.\\d{{3}})\n"
+ "# EOF\n"
: "# TYPE counter_double_total counter\n"
+ $"counter_double_total{{otel_scope_name='{MeterName}',otel_scope_version='{MeterVersion}',key1='value1',key2='value2'}} 101.17 (\\d+)\n"
+ "# EOF\n";
var matches = Regex.Matches(content, ("^" + expected + "$").Replace('\'', '"'));
Assert.Single(matches);
var timestamp = long.Parse(matches[0].Groups[1].Value.Replace(".", string.Empty));
Assert.True(beginTimestamp <= timestamp && timestamp <= endTimestamp);
await VerifyAsync(beginTimestamp, endTimestamp, response, requestOpenMetrics);
}
else
{
@ -356,5 +343,78 @@ public sealed class PrometheusExporterMiddlewareTests
await host.StopAsync();
}
private static async Task VerifyAsync(long beginTimestamp, long endTimestamp, HttpResponseMessage response, bool requestOpenMetrics)
{
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
Assert.True(response.Content.Headers.Contains("Last-Modified"));
if (requestOpenMetrics)
{
Assert.Equal("application/openmetrics-text; version=1.0.0; charset=utf-8", response.Content.Headers.ContentType.ToString());
}
else
{
Assert.Equal("text/plain; charset=utf-8; version=0.0.4", response.Content.Headers.ContentType.ToString());
}
string content = (await response.Content.ReadAsStringAsync()).ReplaceLineEndings();
string expected = requestOpenMetrics
? $$"""
# TYPE target info
# HELP target Target metadata
target_info{service_name="my_service",service_instance_id="id1"} 1
# TYPE otel_scope_info info
# HELP otel_scope_info Scope metadata
otel_scope_info{otel_scope_name="{{MeterName}}"} 1
# TYPE counter_double_total counter
counter_double_total{otel_scope_name="{{MeterName}}",otel_scope_version="{{MeterVersion}}",key1="value1",key2="value2"} 101.17 (\d+\.\d{3})
# EOF
""".ReplaceLineEndings()
: $$"""
# TYPE counter_double_total counter
counter_double_total{otel_scope_name="{{MeterName}}",otel_scope_version="{{MeterVersion}}",key1="value1",key2="value2"} 101.17 (\d+)
# EOF
""".ReplaceLineEndings();
var matches = Regex.Matches(content, "^" + expected + "$");
Assert.True(matches.Count == 1, content);
var timestamp = long.Parse(matches[0].Groups[1].Value.Replace(".", string.Empty));
Assert.True(beginTimestamp <= timestamp && timestamp <= endTimestamp, $"{beginTimestamp} {timestamp} {endTimestamp}");
}
private static Task<IHost> StartTestHostAsync(
Action<IApplicationBuilder> configure,
Action<IServiceCollection> configureServices = null,
bool registerMeterProvider = true,
Action<PrometheusAspNetCoreOptions> configureOptions = null)
{
return new HostBuilder()
.ConfigureWebHost(webBuilder => webBuilder
.UseTestServer()
.ConfigureServices(services =>
{
if (registerMeterProvider)
{
services.AddOpenTelemetry().WithMetrics(builder => builder
.ConfigureResource(x => x.Clear().AddService("my_service", serviceInstanceId: "id1"))
.AddMeter(MeterName)
.AddPrometheusExporter(o =>
{
configureOptions?.Invoke(o);
}));
}
configureServices?.Invoke(services);
})
.Configure(configure))
.StartAsync();
}
}
#endif

View File

@ -60,7 +60,7 @@ public sealed class PrometheusCollectionManagerTests
return new Response
{
CollectionResponse = response,
ViewPayload = response.View.ToArray(),
ViewPayload = openMetricsRequested ? response.OpenMetricsView.ToArray() : response.PlainTextView.ToArray(),
};
}
finally
@ -124,7 +124,7 @@ public sealed class PrometheusCollectionManagerTests
return new Response
{
CollectionResponse = response,
ViewPayload = response.View.ToArray(),
ViewPayload = openMetricsRequested ? response.OpenMetricsView.ToArray() : response.PlainTextView.ToArray(),
};
}
finally