From 11654f19e286b55e9508da402286aa54f2ad9094 Mon Sep 17 00:00:00 2001 From: Reiley Yang Date: Tue, 9 Nov 2021 13:30:28 -0800 Subject: [PATCH] Solve PrometheusExporter race condition (step 1) (#2553) * prom prototype * minor refactor * improve mem * skeleton of the PrometheusSerializer * a working server * clear up test * specialize netframework * clean up * update test * fix unit test * minor improvement * comment * escape metric names * handle inf/nan * minor name change * nit * more assertion * patch for old frameworks * add comment explaining why we use empty string if the label value is null * Updated PrometheusExporterMiddleware for new PrometheusSerializer API. * Added some exception handling. * Test fix. Co-authored-by: Mikel Blanchard --- examples/Console/TestPrometheusExporter.cs | 36 +-- .../PrometheusExporterExtensions.cs | 4 +- .../PrometheusExporterMetricsHttpServer.cs | 73 +++-- .../PrometheusExporterMiddleware.cs | 37 ++- .../Implementation/PrometheusSerializer.cs | 292 ++++++++++++++++++ .../Implementation/PrometheusSerializerExt.cs | 199 ++++++++++++ .../PrometheusExporter.cs | 10 +- .../PrometheusExporterMiddlewareBenchmarks.cs | 2 + .../PrometheusExporterExtensionsTests.cs | 2 +- ...rometheusExporterMetricsHttpServerTests.cs | 17 +- .../PrometheusExporterMiddlewareTests.cs | 7 +- 11 files changed, 602 insertions(+), 77 deletions(-) create mode 100644 src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusSerializer.cs create mode 100644 src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusSerializerExt.cs diff --git a/examples/Console/TestPrometheusExporter.cs b/examples/Console/TestPrometheusExporter.cs index 39d9b6b24..2cb360d05 100644 --- a/examples/Console/TestPrometheusExporter.cs +++ b/examples/Console/TestPrometheusExporter.cs @@ -28,9 +28,9 @@ namespace Examples.Console internal class TestPrometheusExporter { private static readonly Meter MyMeter = new Meter("TestMeter"); - private static readonly Counter Counter = MyMeter.CreateCounter("myCounter"); - private static readonly Histogram MyHistogram = MyMeter.CreateHistogram("myHistogram"); - private static readonly Random RandomGenerator = new Random(); + private static readonly Counter Counter = MyMeter.CreateCounter("myCounter", description: "A counter for demonstration purpose."); + private static readonly Histogram MyHistogram = MyMeter.CreateHistogram("myHistogram", description: "A histogram for demonstration purpose."); + private static readonly ThreadLocal ThreadLocalRandom = new ThreadLocal(() => new Random()); internal static object Run(int port, int totalDurationInMins) { @@ -56,43 +56,33 @@ namespace Examples.Console }) .Build(); +#pragma warning disable SA1000 // KeywordsMustBeSpacedCorrectly https://github.com/DotNetAnalyzers/StyleCopAnalyzers/issues/3214 ObservableGauge gauge = MyMeter.CreateObservableGauge( - "Gauge", + "myGauge", () => { - var tag1 = new KeyValuePair("tag1", "value1"); - var tag2 = new KeyValuePair("tag2", "value2"); - return new List>() { - new Measurement(RandomGenerator.Next(1, 1000), tag1, tag2), + new Measurement(ThreadLocalRandom.Value.Next(1, 1000), new("tag1", "value1"), new("tag2", "value2")), + new Measurement(ThreadLocalRandom.Value.Next(1, 1000), new("tag1", "value1"), new("tag2", "value3")), }; - }); + }, + description: "A gauge for demonstration purpose."); using var token = new CancellationTokenSource(); Task writeMetricTask = new Task(() => { while (!token.IsCancellationRequested) { - Counter.Add( - 10, - new KeyValuePair("tag1", "value1"), - new KeyValuePair("tag2", "value2")); - - Counter.Add( - 100, - new KeyValuePair("tag1", "anothervalue"), - new KeyValuePair("tag2", "somethingelse")); - - MyHistogram.Record( - RandomGenerator.Next(1, 1500), - new KeyValuePair("tag1", "value1"), - new KeyValuePair("tag2", "value2")); + Counter.Add(9.9, new("name", "apple"), new("color", "red")); + Counter.Add(99.9, new("name", "lemon"), new("color", "yellow")); + MyHistogram.Record(ThreadLocalRandom.Value.Next(1, 1500), new("tag1", "value1"), new("tag2", "value2")); Task.Delay(10).Wait(); } }); writeMetricTask.Start(); +#pragma warning restore SA1000 // KeywordsMustBeSpacedCorrectly token.CancelAfter(totalDurationInMins * 60 * 1000); diff --git a/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusExporterExtensions.cs b/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusExporterExtensions.cs index 896358296..8fc0c7296 100644 --- a/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusExporterExtensions.cs +++ b/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusExporterExtensions.cs @@ -199,18 +199,20 @@ namespace OpenTelemetry.Exporter.Prometheus /// Serialize metrics to prometheus format. /// /// . + /// Metrics to be exported. /// Stream to write to. /// Optional function to resolve the current date & time. /// to await the operation. public static async Task WriteMetricsCollection( this PrometheusExporter exporter, + Batch metrics, Stream stream, Func getUtcNowDateTimeOffset) { byte[] buffer = ArrayPool.Shared.Rent(8192); try { - foreach (var metric in exporter.Metrics) + foreach (var metric in metrics) { if (!MetricInfoCache.TryGetValue(metric.Name, out MetricInfo metricInfo)) { diff --git a/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusExporterMetricsHttpServer.cs b/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusExporterMetricsHttpServer.cs index f4b562aed..6cfb6afb7 100644 --- a/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusExporterMetricsHttpServer.cs +++ b/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusExporterMetricsHttpServer.cs @@ -83,7 +83,7 @@ namespace OpenTelemetry.Exporter.Prometheus new CancellationTokenSource() : CancellationTokenSource.CreateLinkedTokenSource(token); - this.workerThread = Task.Factory.StartNew(this.WorkerThread, default, TaskCreationOptions.LongRunning, TaskScheduler.Default); + this.workerThread = Task.Factory.StartNew(this.WorkerProc, default, TaskCreationOptions.LongRunning, TaskScheduler.Default); } } @@ -115,7 +115,7 @@ namespace OpenTelemetry.Exporter.Prometheus } } - private void WorkerThread() + private void WorkerProc() { this.httpListener.Start(); @@ -128,13 +128,43 @@ namespace OpenTelemetry.Exporter.Prometheus ctxTask.Wait(this.tokenSource.Token); var ctx = ctxTask.Result; - if (!this.exporter.TryEnterSemaphore()) + try { - ctx.Response.StatusCode = 429; - ctx.Response.Close(); + ctx.Response.StatusCode = 200; + ctx.Response.ContentType = PrometheusMetricsFormatHelper.ContentType; + + this.exporter.OnExport = (metrics) => + { + try + { + var buffer = new byte[65536]; + var cursor = PrometheusSerializer.WriteMetrics(buffer, 0, metrics); + ctx.Response.OutputStream.Write(buffer, 0, cursor - 0); + return ExportResult.Success; + } + catch (Exception) + { + return ExportResult.Failure; + } + }; + + this.exporter.Collect(Timeout.Infinite); + this.exporter.OnExport = null; + } + catch (Exception ex) + { + PrometheusExporterEventSource.Log.FailedExport(ex); + + ctx.Response.StatusCode = 500; } - Task.Run(() => this.ProcessExportRequest(ctx)); + try + { + ctx.Response.Close(); + } + catch + { + } } } catch (OperationCanceledException ex) @@ -154,36 +184,5 @@ namespace OpenTelemetry.Exporter.Prometheus } } } - - private async Task ProcessExportRequest(HttpListenerContext context) - { - try - { - this.exporter.Collect(Timeout.Infinite); - - context.Response.StatusCode = 200; - context.Response.ContentType = PrometheusMetricsFormatHelper.ContentType; - - await this.exporter.WriteMetricsCollection(context.Response.OutputStream, this.exporter.Options.GetUtcNowDateTimeOffset).ConfigureAwait(false); - } - catch (Exception ex) - { - PrometheusExporterEventSource.Log.FailedExport(ex); - - context.Response.StatusCode = 500; - } - finally - { - try - { - context.Response.Close(); - } - catch - { - } - - this.exporter.ReleaseSemaphore(); - } - } } } diff --git a/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusExporterMiddleware.cs b/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusExporterMiddleware.cs index 110b4f5c4..70bc0bb9a 100644 --- a/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusExporterMiddleware.cs +++ b/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusExporterMiddleware.cs @@ -67,34 +67,57 @@ namespace OpenTelemetry.Exporter.Prometheus return; } + var buffer = new byte[65536]; + var count = 0; + + this.exporter.OnExport = (metrics) => + { + try + { + count = PrometheusSerializer.WriteMetrics(buffer, 0, metrics); + return ExportResult.Success; + } + catch (Exception ex) + { + PrometheusExporterEventSource.Log.FailedExport(ex); + return ExportResult.Failure; + } + }; + try { - this.exporter.Collect(Timeout.Infinite); - - await WriteMetricsToResponse(this.exporter, response).ConfigureAwait(false); + if (this.exporter.Collect(Timeout.Infinite)) + { + await WriteMetricsToResponse(buffer, count, response).ConfigureAwait(false); + } + else + { + response.StatusCode = 500; + } } catch (Exception ex) { + PrometheusExporterEventSource.Log.FailedExport(ex); if (!response.HasStarted) { response.StatusCode = 500; } - - PrometheusExporterEventSource.Log.FailedExport(ex); } finally { this.exporter.ReleaseSemaphore(); } + + this.exporter.OnExport = null; } [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal static async Task WriteMetricsToResponse(PrometheusExporter exporter, HttpResponse response) + internal static async Task WriteMetricsToResponse(byte[] buffer, int count, HttpResponse response) { response.StatusCode = 200; response.ContentType = PrometheusMetricsFormatHelper.ContentType; - await exporter.WriteMetricsCollection(response.Body, exporter.Options.GetUtcNowDateTimeOffset).ConfigureAwait(false); + await response.Body.WriteAsync(buffer, 0, count).ConfigureAwait(false); } } } diff --git a/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusSerializer.cs b/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusSerializer.cs new file mode 100644 index 000000000..8131330ba --- /dev/null +++ b/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusSerializer.cs @@ -0,0 +1,292 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#if NETCOREAPP3_1_OR_GREATER +using System; +#endif +using System.Diagnostics; +using System.Globalization; +using System.Runtime.CompilerServices; + +namespace OpenTelemetry.Exporter.Prometheus +{ + /// + /// Basic PrometheusSerializer which has no OpenTelemetry dependency. + /// + internal static partial class PrometheusSerializer + { +#pragma warning disable SA1310 // Field name should not contain an underscore + private const byte ASCII_QUOTATION_MARK = 0x22; // '"' + private const byte ASCII_FULL_STOP = 0x2E; // '.' + private const byte ASCII_HYPHEN_MINUS = 0x2D; // '-' + private const byte ASCII_REVERSE_SOLIDUS = 0x5C; // '\\' + private const byte ASCII_LINEFEED = 0x0A; // `\n` +#pragma warning restore SA1310 // Field name should not contain an underscore + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static int WriteDouble(byte[] buffer, int cursor, double value) + { +#if NETCOREAPP3_1_OR_GREATER + if (double.IsFinite(value)) +#else + if (!double.IsInfinity(value) && !double.IsNaN(value)) +#endif + { +#if NETCOREAPP3_1_OR_GREATER + Span span = stackalloc char[128]; + + var result = value.TryFormat(span, out var cchWritten, "G", CultureInfo.InvariantCulture); + Debug.Assert(result, $"{nameof(result)} should be true."); + + for (int i = 0; i < cchWritten; i++) + { + buffer[cursor++] = unchecked((byte)span[i]); + } +#else + cursor = WriteAsciiStringNoEscape(buffer, cursor, value.ToString(CultureInfo.InvariantCulture)); +#endif + } + else if (double.IsPositiveInfinity(value)) + { + cursor = WriteAsciiStringNoEscape(buffer, cursor, "+Inf"); + } + else if (double.IsNegativeInfinity(value)) + { + cursor = WriteAsciiStringNoEscape(buffer, cursor, "-Inf"); + } + else + { + Debug.Assert(double.IsNaN(value), $"{nameof(value)} should be NaN."); + cursor = WriteAsciiStringNoEscape(buffer, cursor, "Nan"); + } + + return cursor; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static int WriteLong(byte[] buffer, int cursor, long value) + { +#if NETCOREAPP3_1_OR_GREATER + Span span = stackalloc char[20]; + + var result = value.TryFormat(span, out var cchWritten, "G", CultureInfo.InvariantCulture); + Debug.Assert(result, $"{nameof(result)} should be true."); + + for (int i = 0; i < cchWritten; i++) + { + buffer[cursor++] = unchecked((byte)span[i]); + } +#else + cursor = WriteAsciiStringNoEscape(buffer, cursor, value.ToString(CultureInfo.InvariantCulture)); +#endif + + return cursor; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static int WriteAsciiStringNoEscape(byte[] buffer, int cursor, string value) + { + for (int i = 0; i < value.Length; i++) + { + buffer[cursor++] = unchecked((byte)value[i]); + } + + return cursor; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static int WriteUnicodeNoEscape(byte[] buffer, int cursor, ushort ordinal) + { + if (ordinal <= 0x7F) + { + buffer[cursor++] = unchecked((byte)ordinal); + } + else if (ordinal <= 0x07FF) + { + buffer[cursor++] = unchecked((byte)(0b_1100_0000 | (ordinal >> 6))); + buffer[cursor++] = unchecked((byte)(0b_1000_0000 | (ordinal & 0b_0011_1111))); + } + else if (ordinal <= 0xFFFF) + { + buffer[cursor++] = unchecked((byte)(0b_1110_0000 | (ordinal >> 12))); + buffer[cursor++] = unchecked((byte)(0b_1000_0000 | ((ordinal >> 6) & 0b_0011_1111))); + buffer[cursor++] = unchecked((byte)(0b_1000_0000 | (ordinal & 0b_0011_1111))); + } + else + { + Debug.Assert(ordinal <= 0xFFFF, ".NET string should not go beyond Unicode BMP."); + } + + return cursor; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static int WriteUnicodeString(byte[] buffer, int cursor, string value) + { + for (int i = 0; i < value.Length; i++) + { + var ordinal = (ushort)value[i]; + switch (ordinal) + { + case ASCII_REVERSE_SOLIDUS: + buffer[cursor++] = ASCII_REVERSE_SOLIDUS; + buffer[cursor++] = ASCII_REVERSE_SOLIDUS; + break; + case ASCII_LINEFEED: + buffer[cursor++] = ASCII_REVERSE_SOLIDUS; + buffer[cursor++] = unchecked((byte)'n'); + break; + default: + cursor = WriteUnicodeNoEscape(buffer, cursor, ordinal); + break; + } + } + + return cursor; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static int WriteLabelKey(byte[] buffer, int cursor, string value) + { + Debug.Assert(!string.IsNullOrEmpty(value), $"{nameof(value)} should not be null or empty."); + + var ordinal = (ushort)value[0]; + + if (ordinal >= (ushort)'0' && ordinal <= (ushort)'9') + { + buffer[cursor++] = unchecked((byte)'_'); + } + + for (int i = 0; i < value.Length; i++) + { + ordinal = (ushort)value[i]; + + if ((ordinal >= (ushort)'A' && ordinal <= (ushort)'Z') || + (ordinal >= (ushort)'a' && ordinal <= (ushort)'z') || + (ordinal >= (ushort)'0' && ordinal <= (ushort)'9')) + { + cursor = WriteUnicodeNoEscape(buffer, cursor, ordinal); + } + else + { + buffer[cursor++] = unchecked((byte)'_'); + } + } + + return cursor; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static int WriteLabelValue(byte[] buffer, int cursor, string value) + { + Debug.Assert(value != null, $"{nameof(value)} should not be null."); + + for (int i = 0; i < value.Length; i++) + { + var ordinal = (ushort)value[i]; + switch (ordinal) + { + case ASCII_QUOTATION_MARK: + buffer[cursor++] = ASCII_REVERSE_SOLIDUS; + buffer[cursor++] = ASCII_QUOTATION_MARK; + break; + case ASCII_REVERSE_SOLIDUS: + buffer[cursor++] = ASCII_REVERSE_SOLIDUS; + buffer[cursor++] = ASCII_REVERSE_SOLIDUS; + break; + case ASCII_LINEFEED: + buffer[cursor++] = ASCII_REVERSE_SOLIDUS; + buffer[cursor++] = unchecked((byte)'n'); + break; + default: + cursor = WriteUnicodeNoEscape(buffer, cursor, ordinal); + break; + } + } + + return cursor; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static int WriteLabel(byte[] buffer, int cursor, string labelKey, object labelValue) + { + cursor = WriteLabelKey(buffer, cursor, labelKey); + buffer[cursor++] = unchecked((byte)'='); + buffer[cursor++] = unchecked((byte)'"'); + + // In Prometheus, a label with an empty label value is considered equivalent to a label that does not exist. + cursor = WriteLabelValue(buffer, cursor, labelValue?.ToString() ?? string.Empty); + buffer[cursor++] = unchecked((byte)'"'); + + return cursor; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static int WriteMetricName(byte[] buffer, int cursor, string metricName) + { + Debug.Assert(!string.IsNullOrEmpty(metricName), $"{nameof(metricName)} should not be null or empty."); + + for (int i = 0; i < metricName.Length; i++) + { + var ordinal = (ushort)metricName[i]; + switch (ordinal) + { + case ASCII_FULL_STOP: + case ASCII_HYPHEN_MINUS: + buffer[cursor++] = unchecked((byte)'_'); + break; + default: + cursor = WriteUnicodeNoEscape(buffer, cursor, ordinal); + break; + } + } + + return cursor; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static int WriteHelpText(byte[] buffer, int cursor, string metricName, string metricDescription = null) + { + cursor = WriteAsciiStringNoEscape(buffer, cursor, "# HELP "); + cursor = WriteMetricName(buffer, cursor, metricName); + + if (metricDescription != null) + { + buffer[cursor++] = unchecked((byte)' '); + cursor = WriteUnicodeString(buffer, cursor, metricDescription); + } + + buffer[cursor++] = ASCII_LINEFEED; + + return cursor; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static int WriteTypeInfo(byte[] buffer, int cursor, string metricName, string metricType) + { + Debug.Assert(!string.IsNullOrEmpty(metricType), $"{nameof(metricType)} should not be null or empty."); + + cursor = WriteAsciiStringNoEscape(buffer, cursor, "# TYPE "); + cursor = WriteMetricName(buffer, cursor, metricName); + buffer[cursor++] = unchecked((byte)' '); + cursor = WriteAsciiStringNoEscape(buffer, cursor, metricType); + + buffer[cursor++] = ASCII_LINEFEED; + + return cursor; + } + } +} diff --git a/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusSerializerExt.cs b/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusSerializerExt.cs new file mode 100644 index 000000000..c5cb7c5b3 --- /dev/null +++ b/src/OpenTelemetry.Exporter.Prometheus/Implementation/PrometheusSerializerExt.cs @@ -0,0 +1,199 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +using OpenTelemetry.Metrics; + +namespace OpenTelemetry.Exporter.Prometheus +{ + /// + /// OpenTelemetry additions to the PrometheusSerializer. + /// + internal static partial class PrometheusSerializer + { + private static readonly string[] MetricTypes = new string[] { "untyped", "counter", "gauge", "histogram", "summary" }; + + public static int WriteMetrics(byte[] buffer, int cursor, Batch metrics) + { + var spacing = false; + + foreach (var metric in metrics) + { + if (spacing) + { + buffer[cursor++] = ASCII_LINEFEED; + } + else + { + spacing = true; + } + + cursor = WriteMetric(buffer, cursor, metric); + } + + return cursor; + } + + public static int WriteMetric(byte[] buffer, int cursor, Metric metric) + { + if (metric.Description != null) + { + cursor = WriteHelpText(buffer, cursor, metric.Name, metric.Description); + } + + int metricType = (int)metric.MetricType >> 4; + cursor = WriteTypeInfo(buffer, cursor, metric.Name, MetricTypes[metricType]); + + if (metric.MetricType != MetricType.Histogram) + { + foreach (ref var metricPoint in metric.GetMetricPoints()) + { + var keys = metricPoint.Keys; + var values = metricPoint.Values; + var timestamp = metricPoint.EndTime.ToUnixTimeMilliseconds(); + + // Counter and Gauge + cursor = WriteMetricName(buffer, cursor, metric.Name); + buffer[cursor++] = unchecked((byte)'{'); + + for (var i = 0; i < keys.Length; i++) + { + if (i > 0) + { + buffer[cursor++] = unchecked((byte)','); + } + + cursor = WriteLabel(buffer, cursor, keys[i], values[i]); + } + + buffer[cursor++] = unchecked((byte)'}'); + buffer[cursor++] = unchecked((byte)' '); + + if (((int)metric.MetricType & 0b_0000_1111) == 0x0a /* I8 */) + { + cursor = WriteLong(buffer, cursor, metricPoint.LongValue); + } + else + { + cursor = WriteDouble(buffer, cursor, metricPoint.DoubleValue); + } + + buffer[cursor++] = unchecked((byte)' '); + + cursor = WriteLong(buffer, cursor, timestamp); + + buffer[cursor++] = ASCII_LINEFEED; + } + } + else + { + foreach (ref var metricPoint in metric.GetMetricPoints()) + { + var keys = metricPoint.Keys; + var values = metricPoint.Values; + var timestamp = metricPoint.EndTime.ToUnixTimeMilliseconds(); + + // Histogram buckets + var bucketCounts = metricPoint.BucketCounts; + var explicitBounds = metricPoint.ExplicitBounds; + long totalCount = 0; + for (int idxBound = 0; idxBound < explicitBounds.Length + 1; idxBound++) + { + totalCount += bucketCounts[idxBound]; + + cursor = WriteMetricName(buffer, cursor, metric.Name); + cursor = WriteAsciiStringNoEscape(buffer, cursor, "_bucket{"); + + for (var i = 0; i < keys.Length; i++) + { + cursor = WriteLabel(buffer, cursor, keys[i], values[i]); + buffer[cursor++] = unchecked((byte)','); + } + + cursor = WriteAsciiStringNoEscape(buffer, cursor, "le=\""); + + if (idxBound < explicitBounds.Length) + { + cursor = WriteDouble(buffer, cursor, explicitBounds[idxBound]); + } + else + { + cursor = WriteAsciiStringNoEscape(buffer, cursor, "+Inf"); + } + + cursor = WriteAsciiStringNoEscape(buffer, cursor, "\"} "); + + cursor = WriteLong(buffer, cursor, totalCount); + buffer[cursor++] = unchecked((byte)' '); + + cursor = WriteLong(buffer, cursor, timestamp); + + buffer[cursor++] = ASCII_LINEFEED; + } + + // Histogram sum + cursor = WriteMetricName(buffer, cursor, metric.Name); + cursor = WriteAsciiStringNoEscape(buffer, cursor, "_sum{"); + + for (var i = 0; i < keys.Length; i++) + { + if (i > 0) + { + buffer[cursor++] = unchecked((byte)','); + } + + cursor = WriteLabel(buffer, cursor, keys[i], values[i]); + } + + buffer[cursor++] = unchecked((byte)'}'); + buffer[cursor++] = unchecked((byte)' '); + + cursor = WriteDouble(buffer, cursor, metricPoint.DoubleValue); + buffer[cursor++] = unchecked((byte)' '); + + cursor = WriteLong(buffer, cursor, timestamp); + + buffer[cursor++] = ASCII_LINEFEED; + + // Histogram count + cursor = WriteMetricName(buffer, cursor, metric.Name); + cursor = WriteAsciiStringNoEscape(buffer, cursor, "_count{"); + + for (var i = 0; i < keys.Length; i++) + { + if (i > 0) + { + buffer[cursor++] = unchecked((byte)','); + } + + cursor = WriteLabel(buffer, cursor, keys[i], values[i]); + } + + buffer[cursor++] = unchecked((byte)'}'); + buffer[cursor++] = unchecked((byte)' '); + + cursor = WriteLong(buffer, cursor, totalCount); + buffer[cursor++] = unchecked((byte)' '); + + cursor = WriteLong(buffer, cursor, timestamp); + + buffer[cursor++] = ASCII_LINEFEED; + } + } + + return cursor; + } + } +} diff --git a/src/OpenTelemetry.Exporter.Prometheus/PrometheusExporter.cs b/src/OpenTelemetry.Exporter.Prometheus/PrometheusExporter.cs index 6d8e3ad25..599756a40 100644 --- a/src/OpenTelemetry.Exporter.Prometheus/PrometheusExporter.cs +++ b/src/OpenTelemetry.Exporter.Prometheus/PrometheusExporter.cs @@ -34,6 +34,7 @@ namespace OpenTelemetry.Exporter private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1); private readonly PrometheusExporterMetricsHttpServer metricsHttpServer; private Func funcCollect; + private Func, ExportResult> funcExport; private bool disposed; /// @@ -64,10 +65,15 @@ namespace OpenTelemetry.Exporter set => this.funcCollect = value; } + internal Func, ExportResult> OnExport + { + get => this.funcExport; + set => this.funcExport = value; + } + public override ExportResult Export(in Batch metrics) { - this.Metrics = metrics; - return ExportResult.Success; + return this.OnExport(metrics); } internal bool TryEnterSemaphore() diff --git a/test/Benchmarks/Exporter/PrometheusExporterMiddlewareBenchmarks.cs b/test/Benchmarks/Exporter/PrometheusExporterMiddlewareBenchmarks.cs index 194ff9137..5bb7be4b6 100644 --- a/test/Benchmarks/Exporter/PrometheusExporterMiddlewareBenchmarks.cs +++ b/test/Benchmarks/Exporter/PrometheusExporterMiddlewareBenchmarks.cs @@ -81,6 +81,7 @@ namespace Benchmarks.Exporter this.meterProvider?.Dispose(); } + /* TODO: revisit this after PrometheusExporter race condition is solved [Benchmark] public async Task WriteMetricsToResponse() { @@ -91,6 +92,7 @@ namespace Benchmarks.Exporter await PrometheusExporterMiddleware.WriteMetricsToResponse(this.exporter, this.context.Response).ConfigureAwait(false); } } + */ } } #endif diff --git a/test/OpenTelemetry.Exporter.Prometheus.Tests/PrometheusExporterExtensionsTests.cs b/test/OpenTelemetry.Exporter.Prometheus.Tests/PrometheusExporterExtensionsTests.cs index 4399ad94a..e10edfd98 100644 --- a/test/OpenTelemetry.Exporter.Prometheus.Tests/PrometheusExporterExtensionsTests.cs +++ b/test/OpenTelemetry.Exporter.Prometheus.Tests/PrometheusExporterExtensionsTests.cs @@ -74,7 +74,7 @@ namespace OpenTelemetry.Exporter.Prometheus.Tests using MemoryStream ms = new MemoryStream(); - PrometheusExporterExtensions.WriteMetricsCollection(prometheusExporter, ms, () => new DateTimeOffset(2021, 9, 30, 22, 30, 0, TimeSpan.Zero)).GetAwaiter().GetResult(); + PrometheusExporterExtensions.WriteMetricsCollection(prometheusExporter, metrics, ms, () => new DateTimeOffset(2021, 9, 30, 22, 30, 0, TimeSpan.Zero)).GetAwaiter().GetResult(); Assert.Equal( expected, diff --git a/test/OpenTelemetry.Exporter.Prometheus.Tests/PrometheusExporterMetricsHttpServerTests.cs b/test/OpenTelemetry.Exporter.Prometheus.Tests/PrometheusExporterMetricsHttpServerTests.cs index 48b773985..1df72c35c 100644 --- a/test/OpenTelemetry.Exporter.Prometheus.Tests/PrometheusExporterMetricsHttpServerTests.cs +++ b/test/OpenTelemetry.Exporter.Prometheus.Tests/PrometheusExporterMetricsHttpServerTests.cs @@ -48,7 +48,6 @@ namespace OpenTelemetry.Exporter.Prometheus.Tests .AddMeter(meter.Name) .AddPrometheusExporter(o => { - o.GetUtcNowDateTimeOffset = () => new DateTimeOffset(2021, 9, 30, 22, 30, 0, TimeSpan.Zero); #if NET461 bool expectedDefaultState = true; #else @@ -90,6 +89,8 @@ namespace OpenTelemetry.Exporter.Prometheus.Tests new KeyValuePair("key2", "value2"), }; + var beginTimestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(); + var counter = meter.CreateCounter("counter_double"); counter.Add(100.18D, tags); counter.Add(0.99D, tags); @@ -98,13 +99,23 @@ namespace OpenTelemetry.Exporter.Prometheus.Tests using var response = await client.GetAsync($"{address}metrics").ConfigureAwait(false); + var endTimestamp = DateTimeOffset.Now.ToUnixTimeMilliseconds(); + Assert.Equal(HttpStatusCode.OK, response.StatusCode); string content = await response.Content.ReadAsStringAsync().ConfigureAwait(false); + var index = content.LastIndexOf(' '); + Assert.Equal( - $"# TYPE counter_double counter\ncounter_double{{key1=\"value1\",key2=\"value2\"}} 101.17 1633041000000\n", - content); + $"# TYPE counter_double counter\ncounter_double{{key1=\"value1\",key2=\"value2\"}} 101.17", + content.Substring(0, index)); + + Assert.Equal('\n', content[content.Length - 1]); + + var timestamp = long.Parse(content.Substring(index, content.Length - index - 1)); + + Assert.True(beginTimestamp <= timestamp && timestamp <= endTimestamp); } } } diff --git a/test/OpenTelemetry.Exporter.Prometheus.Tests/PrometheusExporterMiddlewareTests.cs b/test/OpenTelemetry.Exporter.Prometheus.Tests/PrometheusExporterMiddlewareTests.cs index bfa630b20..19e074039 100644 --- a/test/OpenTelemetry.Exporter.Prometheus.Tests/PrometheusExporterMiddlewareTests.cs +++ b/test/OpenTelemetry.Exporter.Prometheus.Tests/PrometheusExporterMiddlewareTests.cs @@ -62,9 +62,11 @@ namespace OpenTelemetry.Exporter.Prometheus.Tests string content = await response.Content.ReadAsStringAsync().ConfigureAwait(false); + int index = content.LastIndexOf(' '); + Assert.Equal( - $"# TYPE counter_double counter\ncounter_double{{key1=\"value1\",key2=\"value2\"}} 101.17 1633041000000\n", - content); + $"# TYPE counter_double counter\ncounter_double{{key1=\"value1\",key2=\"value2\"}} 101.17", + content.Substring(0, index)); await host.StopAsync().ConfigureAwait(false); } @@ -77,7 +79,6 @@ namespace OpenTelemetry.Exporter.Prometheus.Tests .AddMeter(MeterName) .AddPrometheusExporter(o => { - o.GetUtcNowDateTimeOffset = () => new DateTimeOffset(2021, 9, 30, 22, 30, 0, TimeSpan.Zero); if (o.StartHttpListener) { throw new InvalidOperationException("StartHttpListener should be false on .NET Core 3.1+.");