Solve PrometheusExporter race condition (step 1) (#2553)
* prom prototype * minor refactor * improve mem * skeleton of the PrometheusSerializer * a working server * clear up test * specialize netframework * clean up * update test * fix unit test * minor improvement * comment * escape metric names * handle inf/nan * minor name change * nit * more assertion * patch for old frameworks * add comment explaining why we use empty string if the label value is null * Updated PrometheusExporterMiddleware for new PrometheusSerializer API. * Added some exception handling. * Test fix. Co-authored-by: Mikel Blanchard <mblanchard@macrosssoftware.com>
This commit is contained in:
parent
05990ae783
commit
11654f19e2
|
|
@ -28,9 +28,9 @@ namespace Examples.Console
|
|||
internal class TestPrometheusExporter
|
||||
{
|
||||
private static readonly Meter MyMeter = new Meter("TestMeter");
|
||||
private static readonly Counter<long> Counter = MyMeter.CreateCounter<long>("myCounter");
|
||||
private static readonly Histogram<long> MyHistogram = MyMeter.CreateHistogram<long>("myHistogram");
|
||||
private static readonly Random RandomGenerator = new Random();
|
||||
private static readonly Counter<double> Counter = MyMeter.CreateCounter<double>("myCounter", description: "A counter for demonstration purpose.");
|
||||
private static readonly Histogram<long> MyHistogram = MyMeter.CreateHistogram<long>("myHistogram", description: "A histogram for demonstration purpose.");
|
||||
private static readonly ThreadLocal<Random> ThreadLocalRandom = new ThreadLocal<Random>(() => new Random());
|
||||
|
||||
internal static object Run(int port, int totalDurationInMins)
|
||||
{
|
||||
|
|
@ -56,43 +56,33 @@ namespace Examples.Console
|
|||
})
|
||||
.Build();
|
||||
|
||||
#pragma warning disable SA1000 // KeywordsMustBeSpacedCorrectly https://github.com/DotNetAnalyzers/StyleCopAnalyzers/issues/3214
|
||||
ObservableGauge<long> gauge = MyMeter.CreateObservableGauge(
|
||||
"Gauge",
|
||||
"myGauge",
|
||||
() =>
|
||||
{
|
||||
var tag1 = new KeyValuePair<string, object>("tag1", "value1");
|
||||
var tag2 = new KeyValuePair<string, object>("tag2", "value2");
|
||||
|
||||
return new List<Measurement<long>>()
|
||||
{
|
||||
new Measurement<long>(RandomGenerator.Next(1, 1000), tag1, tag2),
|
||||
new Measurement<long>(ThreadLocalRandom.Value.Next(1, 1000), new("tag1", "value1"), new("tag2", "value2")),
|
||||
new Measurement<long>(ThreadLocalRandom.Value.Next(1, 1000), new("tag1", "value1"), new("tag2", "value3")),
|
||||
};
|
||||
});
|
||||
},
|
||||
description: "A gauge for demonstration purpose.");
|
||||
|
||||
using var token = new CancellationTokenSource();
|
||||
Task writeMetricTask = new Task(() =>
|
||||
{
|
||||
while (!token.IsCancellationRequested)
|
||||
{
|
||||
Counter.Add(
|
||||
10,
|
||||
new KeyValuePair<string, object>("tag1", "value1"),
|
||||
new KeyValuePair<string, object>("tag2", "value2"));
|
||||
|
||||
Counter.Add(
|
||||
100,
|
||||
new KeyValuePair<string, object>("tag1", "anothervalue"),
|
||||
new KeyValuePair<string, object>("tag2", "somethingelse"));
|
||||
|
||||
MyHistogram.Record(
|
||||
RandomGenerator.Next(1, 1500),
|
||||
new KeyValuePair<string, object>("tag1", "value1"),
|
||||
new KeyValuePair<string, object>("tag2", "value2"));
|
||||
Counter.Add(9.9, new("name", "apple"), new("color", "red"));
|
||||
Counter.Add(99.9, new("name", "lemon"), new("color", "yellow"));
|
||||
MyHistogram.Record(ThreadLocalRandom.Value.Next(1, 1500), new("tag1", "value1"), new("tag2", "value2"));
|
||||
|
||||
Task.Delay(10).Wait();
|
||||
}
|
||||
});
|
||||
writeMetricTask.Start();
|
||||
#pragma warning restore SA1000 // KeywordsMustBeSpacedCorrectly
|
||||
|
||||
token.CancelAfter(totalDurationInMins * 60 * 1000);
|
||||
|
||||
|
|
|
|||
|
|
@ -199,18 +199,20 @@ namespace OpenTelemetry.Exporter.Prometheus
|
|||
/// Serialize metrics to prometheus format.
|
||||
/// </summary>
|
||||
/// <param name="exporter"><see cref="PrometheusExporter"/>.</param>
|
||||
/// <param name="metrics">Metrics to be exported.</param>
|
||||
/// <param name="stream">Stream to write to.</param>
|
||||
/// <param name="getUtcNowDateTimeOffset">Optional function to resolve the current date & time.</param>
|
||||
/// <returns><see cref="Task"/> to await the operation.</returns>
|
||||
public static async Task WriteMetricsCollection(
|
||||
this PrometheusExporter exporter,
|
||||
Batch<Metric> metrics,
|
||||
Stream stream,
|
||||
Func<DateTimeOffset> getUtcNowDateTimeOffset)
|
||||
{
|
||||
byte[] buffer = ArrayPool<byte>.Shared.Rent(8192);
|
||||
try
|
||||
{
|
||||
foreach (var metric in exporter.Metrics)
|
||||
foreach (var metric in metrics)
|
||||
{
|
||||
if (!MetricInfoCache.TryGetValue(metric.Name, out MetricInfo metricInfo))
|
||||
{
|
||||
|
|
|
|||
|
|
@ -83,7 +83,7 @@ namespace OpenTelemetry.Exporter.Prometheus
|
|||
new CancellationTokenSource() :
|
||||
CancellationTokenSource.CreateLinkedTokenSource(token);
|
||||
|
||||
this.workerThread = Task.Factory.StartNew(this.WorkerThread, default, TaskCreationOptions.LongRunning, TaskScheduler.Default);
|
||||
this.workerThread = Task.Factory.StartNew(this.WorkerProc, default, TaskCreationOptions.LongRunning, TaskScheduler.Default);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -115,7 +115,7 @@ namespace OpenTelemetry.Exporter.Prometheus
|
|||
}
|
||||
}
|
||||
|
||||
private void WorkerThread()
|
||||
private void WorkerProc()
|
||||
{
|
||||
this.httpListener.Start();
|
||||
|
||||
|
|
@ -128,13 +128,43 @@ namespace OpenTelemetry.Exporter.Prometheus
|
|||
ctxTask.Wait(this.tokenSource.Token);
|
||||
var ctx = ctxTask.Result;
|
||||
|
||||
if (!this.exporter.TryEnterSemaphore())
|
||||
try
|
||||
{
|
||||
ctx.Response.StatusCode = 429;
|
||||
ctx.Response.Close();
|
||||
ctx.Response.StatusCode = 200;
|
||||
ctx.Response.ContentType = PrometheusMetricsFormatHelper.ContentType;
|
||||
|
||||
this.exporter.OnExport = (metrics) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
var buffer = new byte[65536];
|
||||
var cursor = PrometheusSerializer.WriteMetrics(buffer, 0, metrics);
|
||||
ctx.Response.OutputStream.Write(buffer, 0, cursor - 0);
|
||||
return ExportResult.Success;
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
return ExportResult.Failure;
|
||||
}
|
||||
};
|
||||
|
||||
this.exporter.Collect(Timeout.Infinite);
|
||||
this.exporter.OnExport = null;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
PrometheusExporterEventSource.Log.FailedExport(ex);
|
||||
|
||||
ctx.Response.StatusCode = 500;
|
||||
}
|
||||
|
||||
Task.Run(() => this.ProcessExportRequest(ctx));
|
||||
try
|
||||
{
|
||||
ctx.Response.Close();
|
||||
}
|
||||
catch
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException ex)
|
||||
|
|
@ -154,36 +184,5 @@ namespace OpenTelemetry.Exporter.Prometheus
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessExportRequest(HttpListenerContext context)
|
||||
{
|
||||
try
|
||||
{
|
||||
this.exporter.Collect(Timeout.Infinite);
|
||||
|
||||
context.Response.StatusCode = 200;
|
||||
context.Response.ContentType = PrometheusMetricsFormatHelper.ContentType;
|
||||
|
||||
await this.exporter.WriteMetricsCollection(context.Response.OutputStream, this.exporter.Options.GetUtcNowDateTimeOffset).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
PrometheusExporterEventSource.Log.FailedExport(ex);
|
||||
|
||||
context.Response.StatusCode = 500;
|
||||
}
|
||||
finally
|
||||
{
|
||||
try
|
||||
{
|
||||
context.Response.Close();
|
||||
}
|
||||
catch
|
||||
{
|
||||
}
|
||||
|
||||
this.exporter.ReleaseSemaphore();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -67,34 +67,57 @@ namespace OpenTelemetry.Exporter.Prometheus
|
|||
return;
|
||||
}
|
||||
|
||||
var buffer = new byte[65536];
|
||||
var count = 0;
|
||||
|
||||
this.exporter.OnExport = (metrics) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
count = PrometheusSerializer.WriteMetrics(buffer, 0, metrics);
|
||||
return ExportResult.Success;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
PrometheusExporterEventSource.Log.FailedExport(ex);
|
||||
return ExportResult.Failure;
|
||||
}
|
||||
};
|
||||
|
||||
try
|
||||
{
|
||||
this.exporter.Collect(Timeout.Infinite);
|
||||
|
||||
await WriteMetricsToResponse(this.exporter, response).ConfigureAwait(false);
|
||||
if (this.exporter.Collect(Timeout.Infinite))
|
||||
{
|
||||
await WriteMetricsToResponse(buffer, count, response).ConfigureAwait(false);
|
||||
}
|
||||
else
|
||||
{
|
||||
response.StatusCode = 500;
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
PrometheusExporterEventSource.Log.FailedExport(ex);
|
||||
if (!response.HasStarted)
|
||||
{
|
||||
response.StatusCode = 500;
|
||||
}
|
||||
|
||||
PrometheusExporterEventSource.Log.FailedExport(ex);
|
||||
}
|
||||
finally
|
||||
{
|
||||
this.exporter.ReleaseSemaphore();
|
||||
}
|
||||
|
||||
this.exporter.OnExport = null;
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
internal static async Task WriteMetricsToResponse(PrometheusExporter exporter, HttpResponse response)
|
||||
internal static async Task WriteMetricsToResponse(byte[] buffer, int count, HttpResponse response)
|
||||
{
|
||||
response.StatusCode = 200;
|
||||
response.ContentType = PrometheusMetricsFormatHelper.ContentType;
|
||||
|
||||
await exporter.WriteMetricsCollection(response.Body, exporter.Options.GetUtcNowDateTimeOffset).ConfigureAwait(false);
|
||||
await response.Body.WriteAsync(buffer, 0, count).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,292 @@
|
|||
// <copyright file="PrometheusSerializer.cs" company="OpenTelemetry Authors">
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
// </copyright>
|
||||
|
||||
#if NETCOREAPP3_1_OR_GREATER
|
||||
using System;
|
||||
#endif
|
||||
using System.Diagnostics;
|
||||
using System.Globalization;
|
||||
using System.Runtime.CompilerServices;
|
||||
|
||||
namespace OpenTelemetry.Exporter.Prometheus
|
||||
{
|
||||
/// <summary>
|
||||
/// Basic PrometheusSerializer which has no OpenTelemetry dependency.
|
||||
/// </summary>
|
||||
internal static partial class PrometheusSerializer
|
||||
{
|
||||
#pragma warning disable SA1310 // Field name should not contain an underscore
|
||||
private const byte ASCII_QUOTATION_MARK = 0x22; // '"'
|
||||
private const byte ASCII_FULL_STOP = 0x2E; // '.'
|
||||
private const byte ASCII_HYPHEN_MINUS = 0x2D; // '-'
|
||||
private const byte ASCII_REVERSE_SOLIDUS = 0x5C; // '\\'
|
||||
private const byte ASCII_LINEFEED = 0x0A; // `\n`
|
||||
#pragma warning restore SA1310 // Field name should not contain an underscore
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
public static int WriteDouble(byte[] buffer, int cursor, double value)
|
||||
{
|
||||
#if NETCOREAPP3_1_OR_GREATER
|
||||
if (double.IsFinite(value))
|
||||
#else
|
||||
if (!double.IsInfinity(value) && !double.IsNaN(value))
|
||||
#endif
|
||||
{
|
||||
#if NETCOREAPP3_1_OR_GREATER
|
||||
Span<char> span = stackalloc char[128];
|
||||
|
||||
var result = value.TryFormat(span, out var cchWritten, "G", CultureInfo.InvariantCulture);
|
||||
Debug.Assert(result, $"{nameof(result)} should be true.");
|
||||
|
||||
for (int i = 0; i < cchWritten; i++)
|
||||
{
|
||||
buffer[cursor++] = unchecked((byte)span[i]);
|
||||
}
|
||||
#else
|
||||
cursor = WriteAsciiStringNoEscape(buffer, cursor, value.ToString(CultureInfo.InvariantCulture));
|
||||
#endif
|
||||
}
|
||||
else if (double.IsPositiveInfinity(value))
|
||||
{
|
||||
cursor = WriteAsciiStringNoEscape(buffer, cursor, "+Inf");
|
||||
}
|
||||
else if (double.IsNegativeInfinity(value))
|
||||
{
|
||||
cursor = WriteAsciiStringNoEscape(buffer, cursor, "-Inf");
|
||||
}
|
||||
else
|
||||
{
|
||||
Debug.Assert(double.IsNaN(value), $"{nameof(value)} should be NaN.");
|
||||
cursor = WriteAsciiStringNoEscape(buffer, cursor, "Nan");
|
||||
}
|
||||
|
||||
return cursor;
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
public static int WriteLong(byte[] buffer, int cursor, long value)
|
||||
{
|
||||
#if NETCOREAPP3_1_OR_GREATER
|
||||
Span<char> span = stackalloc char[20];
|
||||
|
||||
var result = value.TryFormat(span, out var cchWritten, "G", CultureInfo.InvariantCulture);
|
||||
Debug.Assert(result, $"{nameof(result)} should be true.");
|
||||
|
||||
for (int i = 0; i < cchWritten; i++)
|
||||
{
|
||||
buffer[cursor++] = unchecked((byte)span[i]);
|
||||
}
|
||||
#else
|
||||
cursor = WriteAsciiStringNoEscape(buffer, cursor, value.ToString(CultureInfo.InvariantCulture));
|
||||
#endif
|
||||
|
||||
return cursor;
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
public static int WriteAsciiStringNoEscape(byte[] buffer, int cursor, string value)
|
||||
{
|
||||
for (int i = 0; i < value.Length; i++)
|
||||
{
|
||||
buffer[cursor++] = unchecked((byte)value[i]);
|
||||
}
|
||||
|
||||
return cursor;
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
public static int WriteUnicodeNoEscape(byte[] buffer, int cursor, ushort ordinal)
|
||||
{
|
||||
if (ordinal <= 0x7F)
|
||||
{
|
||||
buffer[cursor++] = unchecked((byte)ordinal);
|
||||
}
|
||||
else if (ordinal <= 0x07FF)
|
||||
{
|
||||
buffer[cursor++] = unchecked((byte)(0b_1100_0000 | (ordinal >> 6)));
|
||||
buffer[cursor++] = unchecked((byte)(0b_1000_0000 | (ordinal & 0b_0011_1111)));
|
||||
}
|
||||
else if (ordinal <= 0xFFFF)
|
||||
{
|
||||
buffer[cursor++] = unchecked((byte)(0b_1110_0000 | (ordinal >> 12)));
|
||||
buffer[cursor++] = unchecked((byte)(0b_1000_0000 | ((ordinal >> 6) & 0b_0011_1111)));
|
||||
buffer[cursor++] = unchecked((byte)(0b_1000_0000 | (ordinal & 0b_0011_1111)));
|
||||
}
|
||||
else
|
||||
{
|
||||
Debug.Assert(ordinal <= 0xFFFF, ".NET string should not go beyond Unicode BMP.");
|
||||
}
|
||||
|
||||
return cursor;
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
public static int WriteUnicodeString(byte[] buffer, int cursor, string value)
|
||||
{
|
||||
for (int i = 0; i < value.Length; i++)
|
||||
{
|
||||
var ordinal = (ushort)value[i];
|
||||
switch (ordinal)
|
||||
{
|
||||
case ASCII_REVERSE_SOLIDUS:
|
||||
buffer[cursor++] = ASCII_REVERSE_SOLIDUS;
|
||||
buffer[cursor++] = ASCII_REVERSE_SOLIDUS;
|
||||
break;
|
||||
case ASCII_LINEFEED:
|
||||
buffer[cursor++] = ASCII_REVERSE_SOLIDUS;
|
||||
buffer[cursor++] = unchecked((byte)'n');
|
||||
break;
|
||||
default:
|
||||
cursor = WriteUnicodeNoEscape(buffer, cursor, ordinal);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return cursor;
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
public static int WriteLabelKey(byte[] buffer, int cursor, string value)
|
||||
{
|
||||
Debug.Assert(!string.IsNullOrEmpty(value), $"{nameof(value)} should not be null or empty.");
|
||||
|
||||
var ordinal = (ushort)value[0];
|
||||
|
||||
if (ordinal >= (ushort)'0' && ordinal <= (ushort)'9')
|
||||
{
|
||||
buffer[cursor++] = unchecked((byte)'_');
|
||||
}
|
||||
|
||||
for (int i = 0; i < value.Length; i++)
|
||||
{
|
||||
ordinal = (ushort)value[i];
|
||||
|
||||
if ((ordinal >= (ushort)'A' && ordinal <= (ushort)'Z') ||
|
||||
(ordinal >= (ushort)'a' && ordinal <= (ushort)'z') ||
|
||||
(ordinal >= (ushort)'0' && ordinal <= (ushort)'9'))
|
||||
{
|
||||
cursor = WriteUnicodeNoEscape(buffer, cursor, ordinal);
|
||||
}
|
||||
else
|
||||
{
|
||||
buffer[cursor++] = unchecked((byte)'_');
|
||||
}
|
||||
}
|
||||
|
||||
return cursor;
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
public static int WriteLabelValue(byte[] buffer, int cursor, string value)
|
||||
{
|
||||
Debug.Assert(value != null, $"{nameof(value)} should not be null.");
|
||||
|
||||
for (int i = 0; i < value.Length; i++)
|
||||
{
|
||||
var ordinal = (ushort)value[i];
|
||||
switch (ordinal)
|
||||
{
|
||||
case ASCII_QUOTATION_MARK:
|
||||
buffer[cursor++] = ASCII_REVERSE_SOLIDUS;
|
||||
buffer[cursor++] = ASCII_QUOTATION_MARK;
|
||||
break;
|
||||
case ASCII_REVERSE_SOLIDUS:
|
||||
buffer[cursor++] = ASCII_REVERSE_SOLIDUS;
|
||||
buffer[cursor++] = ASCII_REVERSE_SOLIDUS;
|
||||
break;
|
||||
case ASCII_LINEFEED:
|
||||
buffer[cursor++] = ASCII_REVERSE_SOLIDUS;
|
||||
buffer[cursor++] = unchecked((byte)'n');
|
||||
break;
|
||||
default:
|
||||
cursor = WriteUnicodeNoEscape(buffer, cursor, ordinal);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return cursor;
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
public static int WriteLabel(byte[] buffer, int cursor, string labelKey, object labelValue)
|
||||
{
|
||||
cursor = WriteLabelKey(buffer, cursor, labelKey);
|
||||
buffer[cursor++] = unchecked((byte)'=');
|
||||
buffer[cursor++] = unchecked((byte)'"');
|
||||
|
||||
// In Prometheus, a label with an empty label value is considered equivalent to a label that does not exist.
|
||||
cursor = WriteLabelValue(buffer, cursor, labelValue?.ToString() ?? string.Empty);
|
||||
buffer[cursor++] = unchecked((byte)'"');
|
||||
|
||||
return cursor;
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
public static int WriteMetricName(byte[] buffer, int cursor, string metricName)
|
||||
{
|
||||
Debug.Assert(!string.IsNullOrEmpty(metricName), $"{nameof(metricName)} should not be null or empty.");
|
||||
|
||||
for (int i = 0; i < metricName.Length; i++)
|
||||
{
|
||||
var ordinal = (ushort)metricName[i];
|
||||
switch (ordinal)
|
||||
{
|
||||
case ASCII_FULL_STOP:
|
||||
case ASCII_HYPHEN_MINUS:
|
||||
buffer[cursor++] = unchecked((byte)'_');
|
||||
break;
|
||||
default:
|
||||
cursor = WriteUnicodeNoEscape(buffer, cursor, ordinal);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return cursor;
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
public static int WriteHelpText(byte[] buffer, int cursor, string metricName, string metricDescription = null)
|
||||
{
|
||||
cursor = WriteAsciiStringNoEscape(buffer, cursor, "# HELP ");
|
||||
cursor = WriteMetricName(buffer, cursor, metricName);
|
||||
|
||||
if (metricDescription != null)
|
||||
{
|
||||
buffer[cursor++] = unchecked((byte)' ');
|
||||
cursor = WriteUnicodeString(buffer, cursor, metricDescription);
|
||||
}
|
||||
|
||||
buffer[cursor++] = ASCII_LINEFEED;
|
||||
|
||||
return cursor;
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
public static int WriteTypeInfo(byte[] buffer, int cursor, string metricName, string metricType)
|
||||
{
|
||||
Debug.Assert(!string.IsNullOrEmpty(metricType), $"{nameof(metricType)} should not be null or empty.");
|
||||
|
||||
cursor = WriteAsciiStringNoEscape(buffer, cursor, "# TYPE ");
|
||||
cursor = WriteMetricName(buffer, cursor, metricName);
|
||||
buffer[cursor++] = unchecked((byte)' ');
|
||||
cursor = WriteAsciiStringNoEscape(buffer, cursor, metricType);
|
||||
|
||||
buffer[cursor++] = ASCII_LINEFEED;
|
||||
|
||||
return cursor;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,199 @@
|
|||
// <copyright file="PrometheusSerializerExt.cs" company="OpenTelemetry Authors">
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
// </copyright>
|
||||
|
||||
using OpenTelemetry.Metrics;
|
||||
|
||||
namespace OpenTelemetry.Exporter.Prometheus
|
||||
{
|
||||
/// <summary>
|
||||
/// OpenTelemetry additions to the PrometheusSerializer.
|
||||
/// </summary>
|
||||
internal static partial class PrometheusSerializer
|
||||
{
|
||||
private static readonly string[] MetricTypes = new string[] { "untyped", "counter", "gauge", "histogram", "summary" };
|
||||
|
||||
public static int WriteMetrics(byte[] buffer, int cursor, Batch<Metric> metrics)
|
||||
{
|
||||
var spacing = false;
|
||||
|
||||
foreach (var metric in metrics)
|
||||
{
|
||||
if (spacing)
|
||||
{
|
||||
buffer[cursor++] = ASCII_LINEFEED;
|
||||
}
|
||||
else
|
||||
{
|
||||
spacing = true;
|
||||
}
|
||||
|
||||
cursor = WriteMetric(buffer, cursor, metric);
|
||||
}
|
||||
|
||||
return cursor;
|
||||
}
|
||||
|
||||
public static int WriteMetric(byte[] buffer, int cursor, Metric metric)
|
||||
{
|
||||
if (metric.Description != null)
|
||||
{
|
||||
cursor = WriteHelpText(buffer, cursor, metric.Name, metric.Description);
|
||||
}
|
||||
|
||||
int metricType = (int)metric.MetricType >> 4;
|
||||
cursor = WriteTypeInfo(buffer, cursor, metric.Name, MetricTypes[metricType]);
|
||||
|
||||
if (metric.MetricType != MetricType.Histogram)
|
||||
{
|
||||
foreach (ref var metricPoint in metric.GetMetricPoints())
|
||||
{
|
||||
var keys = metricPoint.Keys;
|
||||
var values = metricPoint.Values;
|
||||
var timestamp = metricPoint.EndTime.ToUnixTimeMilliseconds();
|
||||
|
||||
// Counter and Gauge
|
||||
cursor = WriteMetricName(buffer, cursor, metric.Name);
|
||||
buffer[cursor++] = unchecked((byte)'{');
|
||||
|
||||
for (var i = 0; i < keys.Length; i++)
|
||||
{
|
||||
if (i > 0)
|
||||
{
|
||||
buffer[cursor++] = unchecked((byte)',');
|
||||
}
|
||||
|
||||
cursor = WriteLabel(buffer, cursor, keys[i], values[i]);
|
||||
}
|
||||
|
||||
buffer[cursor++] = unchecked((byte)'}');
|
||||
buffer[cursor++] = unchecked((byte)' ');
|
||||
|
||||
if (((int)metric.MetricType & 0b_0000_1111) == 0x0a /* I8 */)
|
||||
{
|
||||
cursor = WriteLong(buffer, cursor, metricPoint.LongValue);
|
||||
}
|
||||
else
|
||||
{
|
||||
cursor = WriteDouble(buffer, cursor, metricPoint.DoubleValue);
|
||||
}
|
||||
|
||||
buffer[cursor++] = unchecked((byte)' ');
|
||||
|
||||
cursor = WriteLong(buffer, cursor, timestamp);
|
||||
|
||||
buffer[cursor++] = ASCII_LINEFEED;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
foreach (ref var metricPoint in metric.GetMetricPoints())
|
||||
{
|
||||
var keys = metricPoint.Keys;
|
||||
var values = metricPoint.Values;
|
||||
var timestamp = metricPoint.EndTime.ToUnixTimeMilliseconds();
|
||||
|
||||
// Histogram buckets
|
||||
var bucketCounts = metricPoint.BucketCounts;
|
||||
var explicitBounds = metricPoint.ExplicitBounds;
|
||||
long totalCount = 0;
|
||||
for (int idxBound = 0; idxBound < explicitBounds.Length + 1; idxBound++)
|
||||
{
|
||||
totalCount += bucketCounts[idxBound];
|
||||
|
||||
cursor = WriteMetricName(buffer, cursor, metric.Name);
|
||||
cursor = WriteAsciiStringNoEscape(buffer, cursor, "_bucket{");
|
||||
|
||||
for (var i = 0; i < keys.Length; i++)
|
||||
{
|
||||
cursor = WriteLabel(buffer, cursor, keys[i], values[i]);
|
||||
buffer[cursor++] = unchecked((byte)',');
|
||||
}
|
||||
|
||||
cursor = WriteAsciiStringNoEscape(buffer, cursor, "le=\"");
|
||||
|
||||
if (idxBound < explicitBounds.Length)
|
||||
{
|
||||
cursor = WriteDouble(buffer, cursor, explicitBounds[idxBound]);
|
||||
}
|
||||
else
|
||||
{
|
||||
cursor = WriteAsciiStringNoEscape(buffer, cursor, "+Inf");
|
||||
}
|
||||
|
||||
cursor = WriteAsciiStringNoEscape(buffer, cursor, "\"} ");
|
||||
|
||||
cursor = WriteLong(buffer, cursor, totalCount);
|
||||
buffer[cursor++] = unchecked((byte)' ');
|
||||
|
||||
cursor = WriteLong(buffer, cursor, timestamp);
|
||||
|
||||
buffer[cursor++] = ASCII_LINEFEED;
|
||||
}
|
||||
|
||||
// Histogram sum
|
||||
cursor = WriteMetricName(buffer, cursor, metric.Name);
|
||||
cursor = WriteAsciiStringNoEscape(buffer, cursor, "_sum{");
|
||||
|
||||
for (var i = 0; i < keys.Length; i++)
|
||||
{
|
||||
if (i > 0)
|
||||
{
|
||||
buffer[cursor++] = unchecked((byte)',');
|
||||
}
|
||||
|
||||
cursor = WriteLabel(buffer, cursor, keys[i], values[i]);
|
||||
}
|
||||
|
||||
buffer[cursor++] = unchecked((byte)'}');
|
||||
buffer[cursor++] = unchecked((byte)' ');
|
||||
|
||||
cursor = WriteDouble(buffer, cursor, metricPoint.DoubleValue);
|
||||
buffer[cursor++] = unchecked((byte)' ');
|
||||
|
||||
cursor = WriteLong(buffer, cursor, timestamp);
|
||||
|
||||
buffer[cursor++] = ASCII_LINEFEED;
|
||||
|
||||
// Histogram count
|
||||
cursor = WriteMetricName(buffer, cursor, metric.Name);
|
||||
cursor = WriteAsciiStringNoEscape(buffer, cursor, "_count{");
|
||||
|
||||
for (var i = 0; i < keys.Length; i++)
|
||||
{
|
||||
if (i > 0)
|
||||
{
|
||||
buffer[cursor++] = unchecked((byte)',');
|
||||
}
|
||||
|
||||
cursor = WriteLabel(buffer, cursor, keys[i], values[i]);
|
||||
}
|
||||
|
||||
buffer[cursor++] = unchecked((byte)'}');
|
||||
buffer[cursor++] = unchecked((byte)' ');
|
||||
|
||||
cursor = WriteLong(buffer, cursor, totalCount);
|
||||
buffer[cursor++] = unchecked((byte)' ');
|
||||
|
||||
cursor = WriteLong(buffer, cursor, timestamp);
|
||||
|
||||
buffer[cursor++] = ASCII_LINEFEED;
|
||||
}
|
||||
}
|
||||
|
||||
return cursor;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -34,6 +34,7 @@ namespace OpenTelemetry.Exporter
|
|||
private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);
|
||||
private readonly PrometheusExporterMetricsHttpServer metricsHttpServer;
|
||||
private Func<int, bool> funcCollect;
|
||||
private Func<Batch<Metric>, ExportResult> funcExport;
|
||||
private bool disposed;
|
||||
|
||||
/// <summary>
|
||||
|
|
@ -64,10 +65,15 @@ namespace OpenTelemetry.Exporter
|
|||
set => this.funcCollect = value;
|
||||
}
|
||||
|
||||
internal Func<Batch<Metric>, ExportResult> OnExport
|
||||
{
|
||||
get => this.funcExport;
|
||||
set => this.funcExport = value;
|
||||
}
|
||||
|
||||
public override ExportResult Export(in Batch<Metric> metrics)
|
||||
{
|
||||
this.Metrics = metrics;
|
||||
return ExportResult.Success;
|
||||
return this.OnExport(metrics);
|
||||
}
|
||||
|
||||
internal bool TryEnterSemaphore()
|
||||
|
|
|
|||
|
|
@ -81,6 +81,7 @@ namespace Benchmarks.Exporter
|
|||
this.meterProvider?.Dispose();
|
||||
}
|
||||
|
||||
/* TODO: revisit this after PrometheusExporter race condition is solved
|
||||
[Benchmark]
|
||||
public async Task WriteMetricsToResponse()
|
||||
{
|
||||
|
|
@ -91,6 +92,7 @@ namespace Benchmarks.Exporter
|
|||
await PrometheusExporterMiddleware.WriteMetricsToResponse(this.exporter, this.context.Response).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
*/
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
|
|
|||
|
|
@ -74,7 +74,7 @@ namespace OpenTelemetry.Exporter.Prometheus.Tests
|
|||
|
||||
using MemoryStream ms = new MemoryStream();
|
||||
|
||||
PrometheusExporterExtensions.WriteMetricsCollection(prometheusExporter, ms, () => new DateTimeOffset(2021, 9, 30, 22, 30, 0, TimeSpan.Zero)).GetAwaiter().GetResult();
|
||||
PrometheusExporterExtensions.WriteMetricsCollection(prometheusExporter, metrics, ms, () => new DateTimeOffset(2021, 9, 30, 22, 30, 0, TimeSpan.Zero)).GetAwaiter().GetResult();
|
||||
|
||||
Assert.Equal(
|
||||
expected,
|
||||
|
|
|
|||
|
|
@ -48,7 +48,6 @@ namespace OpenTelemetry.Exporter.Prometheus.Tests
|
|||
.AddMeter(meter.Name)
|
||||
.AddPrometheusExporter(o =>
|
||||
{
|
||||
o.GetUtcNowDateTimeOffset = () => new DateTimeOffset(2021, 9, 30, 22, 30, 0, TimeSpan.Zero);
|
||||
#if NET461
|
||||
bool expectedDefaultState = true;
|
||||
#else
|
||||
|
|
@ -90,6 +89,8 @@ namespace OpenTelemetry.Exporter.Prometheus.Tests
|
|||
new KeyValuePair<string, object>("key2", "value2"),
|
||||
};
|
||||
|
||||
var beginTimestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds();
|
||||
|
||||
var counter = meter.CreateCounter<double>("counter_double");
|
||||
counter.Add(100.18D, tags);
|
||||
counter.Add(0.99D, tags);
|
||||
|
|
@ -98,13 +99,23 @@ namespace OpenTelemetry.Exporter.Prometheus.Tests
|
|||
|
||||
using var response = await client.GetAsync($"{address}metrics").ConfigureAwait(false);
|
||||
|
||||
var endTimestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds();
|
||||
|
||||
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
|
||||
|
||||
string content = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
|
||||
|
||||
var index = content.LastIndexOf(' ');
|
||||
|
||||
Assert.Equal(
|
||||
$"# TYPE counter_double counter\ncounter_double{{key1=\"value1\",key2=\"value2\"}} 101.17 1633041000000\n",
|
||||
content);
|
||||
$"# TYPE counter_double counter\ncounter_double{{key1=\"value1\",key2=\"value2\"}} 101.17",
|
||||
content.Substring(0, index));
|
||||
|
||||
Assert.Equal('\n', content[content.Length - 1]);
|
||||
|
||||
var timestamp = long.Parse(content.Substring(index, content.Length - index - 1));
|
||||
|
||||
Assert.True(beginTimestamp <= timestamp && timestamp <= endTimestamp);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -62,9 +62,11 @@ namespace OpenTelemetry.Exporter.Prometheus.Tests
|
|||
|
||||
string content = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
|
||||
|
||||
int index = content.LastIndexOf(' ');
|
||||
|
||||
Assert.Equal(
|
||||
$"# TYPE counter_double counter\ncounter_double{{key1=\"value1\",key2=\"value2\"}} 101.17 1633041000000\n",
|
||||
content);
|
||||
$"# TYPE counter_double counter\ncounter_double{{key1=\"value1\",key2=\"value2\"}} 101.17",
|
||||
content.Substring(0, index));
|
||||
|
||||
await host.StopAsync().ConfigureAwait(false);
|
||||
}
|
||||
|
|
@ -77,7 +79,6 @@ namespace OpenTelemetry.Exporter.Prometheus.Tests
|
|||
.AddMeter(MeterName)
|
||||
.AddPrometheusExporter(o =>
|
||||
{
|
||||
o.GetUtcNowDateTimeOffset = () => new DateTimeOffset(2021, 9, 30, 22, 30, 0, TimeSpan.Zero);
|
||||
if (o.StartHttpListener)
|
||||
{
|
||||
throw new InvalidOperationException("StartHttpListener should be false on .NET Core 3.1+.");
|
||||
|
|
|
|||
Loading…
Reference in New Issue