From ca291422ffd649ae067d11315f7ec8ee377038b0 Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai Date: Fri, 19 Nov 2021 07:49:47 -0800 Subject: [PATCH] Add support for multiple readers (#2596) --- .../.publicApi/net461/PublicAPI.Unshipped.txt | 1 - .../netstandard2.0/PublicAPI.Unshipped.txt | 1 - src/OpenTelemetry/CHANGELOG.md | 61 +-- .../Metrics/CompositeMetricReader.cs | 8 +- .../Metrics/CompositeMetricReaderExt.cs | 149 ++++++++ .../Metrics/MeterProviderBuilderBase.cs | 5 - src/OpenTelemetry/Metrics/MeterProviderSdk.cs | 346 ++++++++---------- src/OpenTelemetry/Metrics/MetricReader.cs | 11 +- src/OpenTelemetry/Metrics/MetricReaderExt.cs | 233 ++++++++++++ src/OpenTelemetry/ProviderExtensions.cs | 20 +- .../Metrics/MetricAPITest.cs | 22 ++ .../Metrics/MultipleReadersTests.cs | 119 ++++++ 12 files changed, 726 insertions(+), 250 deletions(-) create mode 100644 src/OpenTelemetry/Metrics/CompositeMetricReaderExt.cs create mode 100644 src/OpenTelemetry/Metrics/MetricReaderExt.cs create mode 100644 test/OpenTelemetry.Tests/Metrics/MultipleReadersTests.cs diff --git a/src/OpenTelemetry/.publicApi/net461/PublicAPI.Unshipped.txt b/src/OpenTelemetry/.publicApi/net461/PublicAPI.Unshipped.txt index fa81310cf..c7eaa42d3 100644 --- a/src/OpenTelemetry/.publicApi/net461/PublicAPI.Unshipped.txt +++ b/src/OpenTelemetry/.publicApi/net461/PublicAPI.Unshipped.txt @@ -121,7 +121,6 @@ static OpenTelemetry.Metrics.MetricTypeExtensions.IsGauge(this OpenTelemetry.Met static OpenTelemetry.Metrics.MetricTypeExtensions.IsHistogram(this OpenTelemetry.Metrics.MetricType self) -> bool static OpenTelemetry.Metrics.MetricTypeExtensions.IsLong(this OpenTelemetry.Metrics.MetricType self) -> bool static OpenTelemetry.Metrics.MetricTypeExtensions.IsSum(this OpenTelemetry.Metrics.MetricType self) -> bool -static OpenTelemetry.ProviderExtensions.GetMetricCollect(this OpenTelemetry.BaseProvider baseProvider) -> System.Func> static OpenTelemetry.Sdk.CreateMeterProviderBuilder() -> OpenTelemetry.Metrics.MeterProviderBuilder static readonly OpenTelemetry.Metrics.MetricStreamConfiguration.Drop -> OpenTelemetry.Metrics.MetricStreamConfiguration virtual OpenTelemetry.BaseExporter.OnForceFlush(int timeoutMilliseconds) -> bool diff --git a/src/OpenTelemetry/.publicApi/netstandard2.0/PublicAPI.Unshipped.txt b/src/OpenTelemetry/.publicApi/netstandard2.0/PublicAPI.Unshipped.txt index fa81310cf..c7eaa42d3 100644 --- a/src/OpenTelemetry/.publicApi/netstandard2.0/PublicAPI.Unshipped.txt +++ b/src/OpenTelemetry/.publicApi/netstandard2.0/PublicAPI.Unshipped.txt @@ -121,7 +121,6 @@ static OpenTelemetry.Metrics.MetricTypeExtensions.IsGauge(this OpenTelemetry.Met static OpenTelemetry.Metrics.MetricTypeExtensions.IsHistogram(this OpenTelemetry.Metrics.MetricType self) -> bool static OpenTelemetry.Metrics.MetricTypeExtensions.IsLong(this OpenTelemetry.Metrics.MetricType self) -> bool static OpenTelemetry.Metrics.MetricTypeExtensions.IsSum(this OpenTelemetry.Metrics.MetricType self) -> bool -static OpenTelemetry.ProviderExtensions.GetMetricCollect(this OpenTelemetry.BaseProvider baseProvider) -> System.Func> static OpenTelemetry.Sdk.CreateMeterProviderBuilder() -> OpenTelemetry.Metrics.MeterProviderBuilder static readonly OpenTelemetry.Metrics.MetricStreamConfiguration.Drop -> OpenTelemetry.Metrics.MetricStreamConfiguration virtual OpenTelemetry.BaseExporter.OnForceFlush(int timeoutMilliseconds) -> bool diff --git a/src/OpenTelemetry/CHANGELOG.md b/src/OpenTelemetry/CHANGELOG.md index c80a6f22c..871bfc5fc 100644 --- a/src/OpenTelemetry/CHANGELOG.md +++ b/src/OpenTelemetry/CHANGELOG.md @@ -26,30 +26,33 @@ ([#2542](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2542)) * Added wildcard support for AddMeter. -([#2459](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2459)) + ([#2459](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2459)) + +* Add support for multiple Metric readers + ([#2596](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2596)) ## 1.2.0-beta1 Released 2021-Oct-08 -* Exception from Observable instrument callbacks does not - result in entire metrics being lost. +* Exception from Observable instrument callbacks does not result in entire + metrics being lost. -* SDK is allocation-free on recording of measurements with - upto 8 tags. +* SDK is allocation-free on recording of measurements with upto 8 tags. * TracerProviderBuilder.AddLegacySource now supports wildcard activity names. ([#2183](https://github.com/open-telemetry/opentelemetry-dotnet/issues/2183)) -* Instrument and View names are validated - [according with the spec](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#instrument). +* Instrument and View names are validated [according with the + spec](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#instrument). ([#2470](https://github.com/open-telemetry/opentelemetry-dotnet/issues/2470)) ## 1.2.0-alpha4 Released 2021-Sep-23 -* `BatchExportProcessor.OnShutdown` will now log the count of dropped telemetry items. +* `BatchExportProcessor.OnShutdown` will now log the count of dropped telemetry + items. ([#2331](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2331)) * Changed `CompositeProcessor.OnForceFlush` to meet with the spec requirement. Now the SDK will invoke `ForceFlush` on all registered @@ -60,14 +63,14 @@ Released 2021-Sep-23 Released 2021-Sep-13 -* Metrics perf improvements, bug fixes. - Replace MetricProcessor with MetricReader. +* Metrics perf improvements, bug fixes. Replace MetricProcessor with + MetricReader. ([#2306](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2306)) -* Add `BatchExportActivityProcessorOptions` which supports field value overriding - using `OTEL_BSP_SCHEDULE_DELAY`, `OTEL_BSP_EXPORT_TIMEOUT`, - `OTEL_BSP_MAX_QUEUE_SIZE`, `OTEL_BSP_MAX_EXPORT_BATCH_SIZE` - envionmental variables as defined in the +* Add `BatchExportActivityProcessorOptions` which supports field value + overriding using `OTEL_BSP_SCHEDULE_DELAY`, `OTEL_BSP_EXPORT_TIMEOUT`, + `OTEL_BSP_MAX_QUEUE_SIZE`, `OTEL_BSP_MAX_EXPORT_BATCH_SIZE` envionmental + variables as defined in the [specification](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.5.0/specification/sdk-environment-variables.md#batch-span-processor). ([#2219](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2219)) @@ -75,21 +78,24 @@ Released 2021-Sep-13 Released 2021-Aug-24 -* More Metrics features. All instrument types, push/pull - exporters, Delta/Cumulative temporality supported. +* More Metrics features. All instrument types, push/pull exporters, + Delta/Cumulative temporality supported. -* `ResourceBuilder.CreateDefault` has detectors for - `OTEL_RESOURCE_ATTRIBUTES`, `OTEL_SERVICE_NAME` environment variables - so that explicit `AddEnvironmentVariableDetector` call is not needed. ([#2247](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2247)) +* `ResourceBuilder.CreateDefault` has detectors for `OTEL_RESOURCE_ATTRIBUTES`, + `OTEL_SERVICE_NAME` environment variables so that explicit + `AddEnvironmentVariableDetector` call is not needed. + ([#2247](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2247)) * `ResourceBuilder.AddEnvironmentVariableDetector` handles `OTEL_SERVICE_NAME` - environmental variable. ([#2209](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2209)) + environmental variable. + ([#2209](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2209)) -* Removes upper constraint for Microsoft.Extensions.Logging - dependencies. ([#2179](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2179)) +* Removes upper constraint for Microsoft.Extensions.Logging dependencies. + ([#2179](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2179)) -* OpenTelemetryLogger modified to not throw, when the - formatter supplied in ILogger.Log call is null. ([#2200](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2200)) +* OpenTelemetryLogger modified to not throw, when the formatter supplied in + ILogger.Log call is null. + ([#2200](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2200)) ## 1.2.0-alpha1 @@ -100,7 +106,8 @@ Released 2021-Jul-23 ([#2174](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2174)) * Removes .NET Framework 4.5.2, .NET 4.6 support. The minimum .NET Framework - version supported is .NET 4.6.1. ([#2138](https://github.com/open-telemetry/opentelemetry-dotnet/issues/2138)) + version supported is .NET 4.6.1. + ([#2138](https://github.com/open-telemetry/opentelemetry-dotnet/issues/2138)) ## 1.1.0 @@ -137,8 +144,8 @@ Released 2021-May-11 Released 2021-Apr-23 * Use `AssemblyFileVersionAttribute` instead of `FileVersionInfo.GetVersionInfo` - to get the SDK version attribute to ensure that it works when the assembly - is not loaded directly from a file on disk + to get the SDK version attribute to ensure that it works when the assembly is + not loaded directly from a file on disk ([#1908](https://github.com/open-telemetry/opentelemetry-dotnet/issues/1908)) ## 1.1.0-beta1 diff --git a/src/OpenTelemetry/Metrics/CompositeMetricReader.cs b/src/OpenTelemetry/Metrics/CompositeMetricReader.cs index 3452a24a1..36f799670 100644 --- a/src/OpenTelemetry/Metrics/CompositeMetricReader.cs +++ b/src/OpenTelemetry/Metrics/CompositeMetricReader.cs @@ -22,11 +22,15 @@ using OpenTelemetry.Internal; namespace OpenTelemetry.Metrics { - internal sealed class CompositeMetricReader : MetricReader + /// + /// CompositeMetricReader that does not deal with adding metrics and recording measurements. + /// + internal sealed partial class CompositeMetricReader : MetricReader { private readonly DoublyLinkedListNode head; private DoublyLinkedListNode tail; private bool disposed; + private int count; public CompositeMetricReader(IEnumerable readers) { @@ -40,6 +44,7 @@ namespace OpenTelemetry.Metrics this.head = new DoublyLinkedListNode(iter.Current); this.tail = this.head; + this.count++; while (iter.MoveNext()) { @@ -57,6 +62,7 @@ namespace OpenTelemetry.Metrics }; this.tail.Next = node; this.tail = node; + this.count++; return this; } diff --git a/src/OpenTelemetry/Metrics/CompositeMetricReaderExt.cs b/src/OpenTelemetry/Metrics/CompositeMetricReaderExt.cs new file mode 100644 index 000000000..11693017f --- /dev/null +++ b/src/OpenTelemetry/Metrics/CompositeMetricReaderExt.cs @@ -0,0 +1,149 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Diagnostics.Metrics; + +namespace OpenTelemetry.Metrics +{ + /// + /// CompositeMetricReader that deals with adding metrics and recording measurements. + /// + internal sealed partial class CompositeMetricReader + { + internal List AddMetricsWithNoViews(Instrument instrument) + { + var metrics = new List(this.count); + for (var cur = this.head; cur != null; cur = cur.Next) + { + var metric = cur.Value.AddMetricWithNoViews(instrument); + metrics.Add(metric); + } + + return metrics; + } + + internal void RecordSingleStreamLongMeasurements(List metrics, long value, ReadOnlySpan> tags) + { + Debug.Assert(metrics.Count == this.count, "The count of metrics to be updated for a CompositeReader must match the number of individual readers."); + + int index = 0; + for (var cur = this.head; cur != null; cur = cur.Next) + { + if (metrics[index] != null) + { + cur.Value.RecordSingleStreamLongMeasurement(metrics[index], value, tags); + } + + index++; + } + } + + internal void RecordSingleStreamDoubleMeasurements(List metrics, double value, ReadOnlySpan> tags) + { + Debug.Assert(metrics.Count == this.count, "The count of metrics to be updated for a CompositeReader must match the number of individual readers."); + + int index = 0; + for (var cur = this.head; cur != null; cur = cur.Next) + { + if (metrics[index] != null) + { + cur.Value.RecordSingleStreamDoubleMeasurement(metrics[index], value, tags); + } + + index++; + } + } + + internal List> AddMetricsSuperListWithViews(Instrument instrument, List metricStreamConfigs) + { + var metricsSuperList = new List>(this.count); + for (var cur = this.head; cur != null; cur = cur.Next) + { + var metrics = cur.Value.AddMetricsListWithViews(instrument, metricStreamConfigs); + metricsSuperList.Add(metrics); + } + + return metricsSuperList; + } + + internal void RecordLongMeasurements(List> metricsSuperList, long value, ReadOnlySpan> tags) + { + Debug.Assert(metricsSuperList.Count == this.count, "The count of metrics to be updated for a CompositeReader must match the number of individual readers."); + + int index = 0; + for (var cur = this.head; cur != null; cur = cur.Next) + { + if (metricsSuperList[index].Count > 0) + { + cur.Value.RecordLongMeasurement(metricsSuperList[index], value, tags); + } + + index++; + } + } + + internal void RecordDoubleMeasurements(List> metricsSuperList, double value, ReadOnlySpan> tags) + { + Debug.Assert(metricsSuperList.Count == this.count, "The count of metrics to be updated for a CompositeReader must match the number of individual readers."); + + int index = 0; + for (var cur = this.head; cur != null; cur = cur.Next) + { + if (metricsSuperList[index].Count > 0) + { + cur.Value.RecordDoubleMeasurement(metricsSuperList[index], value, tags); + } + + index++; + } + } + + internal void CompleteSingleStreamMeasurements(List metrics) + { + Debug.Assert(metrics.Count == this.count, "The count of metrics to be updated for a CompositeReader must match the number of individual readers."); + + int index = 0; + for (var cur = this.head; cur != null; cur = cur.Next) + { + if (metrics[index] != null) + { + cur.Value.CompleteSingleStreamMeasurement(metrics[index]); + } + + index++; + } + } + + internal void CompleteMesaurements(List> metricsSuperList) + { + Debug.Assert(metricsSuperList.Count == this.count, "The count of metrics to be updated for a CompositeReader must match the number of individual readers."); + + int index = 0; + for (var cur = this.head; cur != null; cur = cur.Next) + { + if (metricsSuperList[index].Count > 0) + { + cur.Value.CompleteMeasurement(metricsSuperList[index]); + } + + index++; + } + } + } +} diff --git a/src/OpenTelemetry/Metrics/MeterProviderBuilderBase.cs b/src/OpenTelemetry/Metrics/MeterProviderBuilderBase.cs index b797d5a4a..2bc47af6a 100644 --- a/src/OpenTelemetry/Metrics/MeterProviderBuilderBase.cs +++ b/src/OpenTelemetry/Metrics/MeterProviderBuilderBase.cs @@ -71,11 +71,6 @@ namespace OpenTelemetry.Metrics internal MeterProviderBuilder AddReader(MetricReader reader) { - if (this.MetricReaders.Count >= 1) - { - throw new InvalidOperationException("Only one Metricreader is allowed."); - } - this.MetricReaders.Add(reader); return this; } diff --git a/src/OpenTelemetry/Metrics/MeterProviderSdk.cs b/src/OpenTelemetry/Metrics/MeterProviderSdk.cs index c6de17751..63448e2c4 100644 --- a/src/OpenTelemetry/Metrics/MeterProviderSdk.cs +++ b/src/OpenTelemetry/Metrics/MeterProviderSdk.cs @@ -27,18 +27,13 @@ namespace OpenTelemetry.Metrics { internal sealed class MeterProviderSdk : MeterProvider { - internal const int MaxMetrics = 1000; internal int ShutdownCount; - private readonly Metric[] metrics; - private readonly Metric[] metricsCurrentBatch; private readonly List instrumentations = new List(); private readonly List> viewConfigs; private readonly object collectLock = new object(); - private readonly object instrumentCreationLock = new object(); - private readonly HashSet metricStreamNames = new HashSet(StringComparer.OrdinalIgnoreCase); private readonly MeterListener listener; private readonly MetricReader reader; - private int metricIndex = -1; + private readonly CompositeMetricReader compositeMetricReader; private bool disposed; internal MeterProviderSdk( @@ -50,10 +45,6 @@ namespace OpenTelemetry.Metrics { this.Resource = resource; this.viewConfigs = viewConfigs; - this.metrics = new Metric[MaxMetrics]; - this.metricsCurrentBatch = new Metric[MaxMetrics]; - - AggregationTemporality temporality = AggregationTemporality.Cumulative; foreach (var reader in readers) { @@ -61,10 +52,6 @@ namespace OpenTelemetry.Metrics reader.SetParentProvider(this); - // TODO: Actually support multiple readers. - // Currently the last reader's temporality wins. - temporality = reader.PreferredAggregationTemporality; - if (this.reader == null) { this.reader = reader; @@ -79,6 +66,8 @@ namespace OpenTelemetry.Metrics } } + this.compositeMetricReader = this.reader as CompositeMetricReader; + if (instrumentationFactories.Any()) { foreach (var instrumentationFactory in instrumentationFactories) @@ -107,6 +96,9 @@ namespace OpenTelemetry.Metrics this.listener = new MeterListener(); var viewConfigCount = this.viewConfigs.Count; + + // We expect that all the readers to be added are provided before MeterProviderSdk is built. + // If there are no readers added, we do not enable measurements for the instruments. if (viewConfigCount > 0) { this.listener.InstrumentPublished = (instrument, listener) => @@ -141,73 +133,23 @@ namespace OpenTelemetry.Metrics metricStreamConfigs.Add(null); } - var maxCountMetricsToBeCreated = metricStreamConfigs.Count; - - // Create list with initial capacity as the max metric count. - // Due to duplicate/max limit, we may not end up using them - // all, and that memory is wasted until Meter disposed. - // TODO: Revisit to see if we need to do metrics.TrimExcess() - var metrics = new List(maxCountMetricsToBeCreated); - lock (this.instrumentCreationLock) + if (this.reader != null) { - for (int i = 0; i < maxCountMetricsToBeCreated; i++) + if (this.compositeMetricReader == null) { - var metricStreamConfig = metricStreamConfigs[i]; - var meterName = instrument.Meter.Name; - var metricName = metricStreamConfig?.Name ?? instrument.Name; - var metricStreamName = $"{meterName}.{metricName}"; - - if (!MeterProviderBuilderSdk.IsValidInstrumentName(metricName)) + var metrics = this.reader.AddMetricsListWithViews(instrument, metricStreamConfigs); + if (metrics.Count > 0) { - OpenTelemetrySdkEventSource.Log.MetricInstrumentIgnored( - metricName, - instrument.Meter.Name, - "Metric name is invalid.", - "The name must comply with the OpenTelemetry specification."); - - continue; - } - - if (this.metricStreamNames.Contains(metricStreamName)) - { - // TODO: Log that instrument is ignored - // as the resulting Metric name is conflicting - // with existing name. - continue; - } - - if (metricStreamConfig?.Aggregation == Aggregation.Drop) - { - // TODO: Log that instrument is ignored - // as user explicitly asked to drop it - // with View. - continue; - } - - var index = ++this.metricIndex; - if (index >= MaxMetrics) - { - // TODO: Log that instrument is ignored - // as max number of Metrics have reached. - } - else - { - Metric metric; - var metricDescription = metricStreamConfig?.Description ?? instrument.Description; - string[] tagKeysInteresting = metricStreamConfig?.TagKeys; - double[] histogramBucketBounds = (metricStreamConfig is ExplicitBucketHistogramConfiguration histogramConfig - && histogramConfig.Boundaries != null) ? histogramConfig.Boundaries : null; - metric = new Metric(instrument, temporality, metricName, metricDescription, histogramBucketBounds, tagKeysInteresting); - - this.metrics[index] = metric; - metrics.Add(metric); - this.metricStreamNames.Add(metricStreamName); + listener.EnableMeasurementEvents(instrument, metrics); } } - - if (metrics.Count > 0) + else { - listener.EnableMeasurementEvents(instrument, metrics); + var metricsSuperList = this.compositeMetricReader.AddMetricsSuperListWithViews(instrument, metricStreamConfigs); + if (metricsSuperList.Any(metrics => metrics.Count > 0)) + { + listener.EnableMeasurementEvents(instrument, metricsSuperList); + } } } }; @@ -247,33 +189,25 @@ namespace OpenTelemetry.Metrics return; } - var meterName = instrument.Meter.Name; - var metricName = instrument.Name; - var metricStreamName = $"{meterName}.{metricName}"; - Metric metric = null; - lock (this.instrumentCreationLock) + if (this.reader != null) { - if (this.metricStreamNames.Contains(metricStreamName)) + if (this.compositeMetricReader == null) { - OpenTelemetrySdkEventSource.Log.MetricInstrumentIgnored(metricName, instrument.Meter.Name, "Metric name conflicting with existing name.", "Either change the name of the instrument or change name using View."); - return; - } - - var index = ++this.metricIndex; - if (index >= MaxMetrics) - { - OpenTelemetrySdkEventSource.Log.MetricInstrumentIgnored(metricName, instrument.Meter.Name, "Maximum allowed Metrics for the provider exceeded.", "Use views to drop unused instruments. Or configure Provider to allow higher limit."); - return; + var metric = this.reader.AddMetricWithNoViews(instrument); + if (metric != null) + { + listener.EnableMeasurementEvents(instrument, metric); + } } else { - metric = new Metric(instrument, temporality, metricName, instrument.Description); - this.metrics[index] = metric; - this.metricStreamNames.Add(metricStreamName); + var metrics = this.compositeMetricReader.AddMetricsWithNoViews(instrument); + if (metrics.Any(metric => metric != null)) + { + listener.EnableMeasurementEvents(instrument, metrics); + } } } - - listener.EnableMeasurementEvents(instrument, metric); } catch (Exception) { @@ -311,168 +245,175 @@ namespace OpenTelemetry.Metrics internal void MeasurementsCompletedSingleStream(Instrument instrument, object state) { - var metric = state as Metric; - if (metric == null) - { - // TODO: log - return; - } + Debug.Assert(instrument != null, "instrument must be non-null."); - metric.InstrumentDisposed = true; + if (this.compositeMetricReader == null) + { + if (state is not Metric metric) + { + // TODO: log + return; + } + + this.reader.CompleteSingleStreamMeasurement(metric); + } + else + { + if (state is not List metrics) + { + // TODO: log + return; + } + + this.compositeMetricReader.CompleteSingleStreamMeasurements(metrics); + } } internal void MeasurementsCompleted(Instrument instrument, object state) { - var metrics = state as List; - if (metrics == null) - { - // TODO: log - return; - } + Debug.Assert(instrument != null, "instrument must be non-null."); - foreach (var metric in metrics) + if (this.compositeMetricReader == null) { - metric.InstrumentDisposed = true; + if (state is not List metrics) + { + // TODO: log + return; + } + + this.reader.CompleteMeasurement(metrics); + } + else + { + if (state is not List> metricsSuperList) + { + // TODO: log + return; + } + + this.compositeMetricReader.CompleteMesaurements(metricsSuperList); } } internal void MeasurementRecordedDouble(Instrument instrument, double value, ReadOnlySpan> tagsRos, object state) { - // Get Instrument State - var metrics = state as List; - Debug.Assert(instrument != null, "instrument must be non-null."); - if (metrics == null) - { - // TODO: log - return; - } - if (metrics.Count == 1) + if (this.compositeMetricReader == null) { - // special casing the common path - // as this is faster than the - // foreach, when count is 1. - metrics[0].UpdateDouble(value, tagsRos); + if (state is not List metrics) + { + // TODO: log + return; + } + + this.reader.RecordDoubleMeasurement(metrics, value, tagsRos); } else { - foreach (var metric in metrics) + if (state is not List> metricsSuperList) { - metric.UpdateDouble(value, tagsRos); + // TODO: log + return; } + + this.compositeMetricReader.RecordDoubleMeasurements(metricsSuperList, value, tagsRos); } } internal void MeasurementRecordedLong(Instrument instrument, long value, ReadOnlySpan> tagsRos, object state) { - // Get Instrument State - var metrics = state as List; - Debug.Assert(instrument != null, "instrument must be non-null."); - if (metrics == null) - { - // TODO: log - return; - } - if (metrics.Count == 1) + if (this.compositeMetricReader == null) { - // special casing the common path - // as this is faster than the - // foreach, when count is 1. - metrics[0].UpdateLong(value, tagsRos); + if (state is not List metrics) + { + // TODO: log + return; + } + + this.reader.RecordLongMeasurement(metrics, value, tagsRos); } else { - foreach (var metric in metrics) + if (state is not List> metricsSuperList) { - metric.UpdateLong(value, tagsRos); + // TODO: log + return; } + + this.compositeMetricReader.RecordLongMeasurements(metricsSuperList, value, tagsRos); } } internal void MeasurementRecordedLongSingleStream(Instrument instrument, long value, ReadOnlySpan> tagsRos, object state) { - // Get Instrument State - var metric = state as Metric; - Debug.Assert(instrument != null, "instrument must be non-null."); - if (metric == null) - { - // TODO: log - return; - } - metric.UpdateLong(value, tagsRos); + if (this.compositeMetricReader == null) + { + if (state is not Metric metric) + { + // TODO: log + return; + } + + this.reader.RecordSingleStreamLongMeasurement(metric, value, tagsRos); + } + else + { + if (state is not List metrics) + { + // TODO: log + return; + } + + this.compositeMetricReader.RecordSingleStreamLongMeasurements(metrics, value, tagsRos); + } } internal void MeasurementRecordedDoubleSingleStream(Instrument instrument, double value, ReadOnlySpan> tagsRos, object state) { - // Get Instrument State - var metric = state as Metric; - Debug.Assert(instrument != null, "instrument must be non-null."); - if (metric == null) - { - // TODO: log - return; - } - metric.UpdateDouble(value, tagsRos); + if (this.compositeMetricReader == null) + { + if (state is not Metric metric) + { + // TODO: log + return; + } + + this.reader.RecordSingleStreamDoubleMeasurement(metric, value, tagsRos); + } + else + { + if (state is not List metrics) + { + // TODO: log + return; + } + + this.compositeMetricReader.RecordSingleStreamDoubleMeasurements(metrics, value, tagsRos); + } } - internal Batch Collect() + internal void CollectObservableInstruments() { lock (this.collectLock) { + // Record all observable instruments try { - // Record all observable instruments - try - { - this.listener.RecordObservableInstruments(); - } - catch (Exception exception) - { - // TODO: - // It doesn't looks like we can find which instrument callback - // threw. - OpenTelemetrySdkEventSource.Log.MetricObserverCallbackException(exception); - } - - var indexSnapshot = Math.Min(this.metricIndex, MaxMetrics - 1); - var target = indexSnapshot + 1; - int metricCountCurrentBatch = 0; - for (int i = 0; i < target; i++) - { - var metric = this.metrics[i]; - int metricPointSize = 0; - if (metric != null) - { - if (metric.InstrumentDisposed) - { - metricPointSize = metric.Snapshot(); - this.metrics[i] = null; - } - else - { - metricPointSize = metric.Snapshot(); - } - - if (metricPointSize > 0) - { - this.metricsCurrentBatch[metricCountCurrentBatch++] = metric; - } - } - } - - return (metricCountCurrentBatch > 0) ? new Batch(this.metricsCurrentBatch, metricCountCurrentBatch) : default; + this.listener.RecordObservableInstruments(); } - catch (Exception) + catch (Exception exception) { - // TODO: Log - return default; + // TODO: + // It doesn't looks like we can find which instrument callback + // threw. + OpenTelemetrySdkEventSource.Log.MetricObserverCallbackException(exception); } } } @@ -538,6 +479,7 @@ namespace OpenTelemetry.Metrics // Wait for up to 5 seconds grace period this.reader?.Shutdown(5000); this.reader?.Dispose(); + this.compositeMetricReader?.Dispose(); this.listener.Dispose(); } diff --git a/src/OpenTelemetry/Metrics/MetricReader.cs b/src/OpenTelemetry/Metrics/MetricReader.cs index 73f07f05f..99c5c05c2 100644 --- a/src/OpenTelemetry/Metrics/MetricReader.cs +++ b/src/OpenTelemetry/Metrics/MetricReader.cs @@ -22,7 +22,10 @@ using OpenTelemetry.Internal; namespace OpenTelemetry.Metrics { - public abstract class MetricReader : IDisposable + /// + /// MetricReader which does not deal with individual metrics. + /// + public abstract partial class MetricReader : IDisposable { private const AggregationTemporality CumulativeAndDelta = AggregationTemporality.Cumulative | AggregationTemporality.Delta; private readonly object newTaskLock = new object(); @@ -211,8 +214,10 @@ namespace OpenTelemetry.Metrics ? null : Stopwatch.StartNew(); - var collectMetric = this.ParentProvider.GetMetricCollect(); - var metrics = collectMetric(); + var collectObservableInstruments = this.ParentProvider.GetObservableInstrumentCollectCallback(); + collectObservableInstruments(); + + var metrics = this.GetMetricsBatch(); if (sw == null) { diff --git a/src/OpenTelemetry/Metrics/MetricReaderExt.cs b/src/OpenTelemetry/Metrics/MetricReaderExt.cs new file mode 100644 index 000000000..902a37e72 --- /dev/null +++ b/src/OpenTelemetry/Metrics/MetricReaderExt.cs @@ -0,0 +1,233 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +using System; +using System.Collections.Generic; +using System.Diagnostics.Metrics; +using OpenTelemetry.Internal; + +namespace OpenTelemetry.Metrics +{ + /// + /// MetricReader which processes individual metrics. + /// + public abstract partial class MetricReader + { + internal const int MaxMetrics = 1000; + private readonly Metric[] metrics = new Metric[MaxMetrics]; + private readonly Metric[] metricsCurrentBatch = new Metric[MaxMetrics]; + private readonly HashSet metricStreamNames = new HashSet(StringComparer.OrdinalIgnoreCase); + private readonly object instrumentCreationLock = new object(); + private int metricIndex = -1; + + internal Metric AddMetricWithNoViews(Instrument instrument) + { + var meterName = instrument.Meter.Name; + var metricName = instrument.Name; + var metricStreamName = $"{meterName}.{metricName}"; + lock (this.instrumentCreationLock) + { + if (this.metricStreamNames.Contains(metricStreamName)) + { + OpenTelemetrySdkEventSource.Log.MetricInstrumentIgnored(metricName, instrument.Meter.Name, "Metric name conflicting with existing name.", "Either change the name of the instrument or change name using View."); + return null; + } + + var index = ++this.metricIndex; + if (index >= MaxMetrics) + { + OpenTelemetrySdkEventSource.Log.MetricInstrumentIgnored(metricName, instrument.Meter.Name, "Maximum allowed Metrics for the provider exceeded.", "Use views to drop unused instruments. Or configure Provider to allow higher limit."); + return null; + } + else + { + var metric = new Metric(instrument, this.preferredAggregationTemporality, metricName, instrument.Description); + this.metrics[index] = metric; + this.metricStreamNames.Add(metricStreamName); + return metric; + } + } + } + + internal void RecordSingleStreamLongMeasurement(Metric metric, long value, ReadOnlySpan> tags) + { + metric.UpdateLong(value, tags); + } + + internal void RecordSingleStreamDoubleMeasurement(Metric metric, double value, ReadOnlySpan> tags) + { + metric.UpdateDouble(value, tags); + } + + internal List AddMetricsListWithViews(Instrument instrument, List metricStreamConfigs) + { + var maxCountMetricsToBeCreated = metricStreamConfigs.Count; + + // Create list with initial capacity as the max metric count. + // Due to duplicate/max limit, we may not end up using them + // all, and that memory is wasted until Meter disposed. + // TODO: Revisit to see if we need to do metrics.TrimExcess() + var metrics = new List(maxCountMetricsToBeCreated); + lock (this.instrumentCreationLock) + { + for (int i = 0; i < maxCountMetricsToBeCreated; i++) + { + var metricStreamConfig = metricStreamConfigs[i]; + var meterName = instrument.Meter.Name; + var metricName = metricStreamConfig?.Name ?? instrument.Name; + var metricStreamName = $"{meterName}.{metricName}"; + + if (!MeterProviderBuilderSdk.IsValidInstrumentName(metricName)) + { + OpenTelemetrySdkEventSource.Log.MetricInstrumentIgnored( + metricName, + instrument.Meter.Name, + "Metric name is invalid.", + "The name must comply with the OpenTelemetry specification."); + + continue; + } + + if (this.metricStreamNames.Contains(metricStreamName)) + { + // TODO: Log that instrument is ignored + // as the resulting Metric name is conflicting + // with existing name. + continue; + } + + if (metricStreamConfig?.Aggregation == Aggregation.Drop) + { + // TODO: Log that instrument is ignored + // as user explicitly asked to drop it + // with View. + continue; + } + + var index = ++this.metricIndex; + if (index >= MaxMetrics) + { + // TODO: Log that instrument is ignored + // as max number of Metrics have reached. + } + else + { + Metric metric; + var metricDescription = metricStreamConfig?.Description ?? instrument.Description; + string[] tagKeysInteresting = metricStreamConfig?.TagKeys; + double[] histogramBucketBounds = (metricStreamConfig is ExplicitBucketHistogramConfiguration histogramConfig + && histogramConfig.Boundaries != null) ? histogramConfig.Boundaries : null; + metric = new Metric(instrument, this.preferredAggregationTemporality, metricName, metricDescription, histogramBucketBounds, tagKeysInteresting); + + this.metrics[index] = metric; + metrics.Add(metric); + this.metricStreamNames.Add(metricStreamName); + } + } + + return metrics; + } + } + + internal void RecordLongMeasurement(List metrics, long value, ReadOnlySpan> tags) + { + if (metrics.Count == 1) + { + // special casing the common path + // as this is faster than the + // foreach, when count is 1. + metrics[0].UpdateLong(value, tags); + } + else + { + foreach (var metric in metrics) + { + metric.UpdateLong(value, tags); + } + } + } + + internal void RecordDoubleMeasurement(List metrics, double value, ReadOnlySpan> tags) + { + if (metrics.Count == 1) + { + // special casing the common path + // as this is faster than the + // foreach, when count is 1. + metrics[0].UpdateDouble(value, tags); + } + else + { + foreach (var metric in metrics) + { + metric.UpdateDouble(value, tags); + } + } + } + + internal void CompleteSingleStreamMeasurement(Metric metric) + { + metric.InstrumentDisposed = true; + } + + internal void CompleteMeasurement(List metrics) + { + foreach (var metric in metrics) + { + metric.InstrumentDisposed = true; + } + } + + private Batch GetMetricsBatch() + { + try + { + var indexSnapshot = Math.Min(this.metricIndex, MaxMetrics - 1); + var target = indexSnapshot + 1; + int metricCountCurrentBatch = 0; + for (int i = 0; i < target; i++) + { + var metric = this.metrics[i]; + int metricPointSize = 0; + if (metric != null) + { + if (metric.InstrumentDisposed) + { + metricPointSize = metric.Snapshot(); + this.metrics[i] = null; + } + else + { + metricPointSize = metric.Snapshot(); + } + + if (metricPointSize > 0) + { + this.metricsCurrentBatch[metricCountCurrentBatch++] = metric; + } + } + } + + return (metricCountCurrentBatch > 0) ? new Batch(this.metricsCurrentBatch, metricCountCurrentBatch) : default; + } + catch (Exception) + { + // TODO: Log + return default; + } + } + } +} diff --git a/src/OpenTelemetry/ProviderExtensions.cs b/src/OpenTelemetry/ProviderExtensions.cs index fa27126e2..1ae3a85ec 100644 --- a/src/OpenTelemetry/ProviderExtensions.cs +++ b/src/OpenTelemetry/ProviderExtensions.cs @@ -50,16 +50,6 @@ namespace OpenTelemetry return Resource.Empty; } - public static Func> GetMetricCollect(this BaseProvider baseProvider) - { - if (baseProvider is MeterProviderSdk meterProviderSdk) - { - return meterProviderSdk.Collect; - } - - return null; - } - /// /// Gets the associated with the . /// @@ -69,5 +59,15 @@ namespace OpenTelemetry { return ResourceBuilder.CreateDefault().Build(); } + + internal static Action GetObservableInstrumentCollectCallback(this BaseProvider baseProvider) + { + if (baseProvider is MeterProviderSdk meterProviderSdk) + { + return meterProviderSdk.CollectObservableInstruments; + } + + return null; + } } } diff --git a/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs b/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs index 2ab58a86c..3c5ba50fa 100644 --- a/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs +++ b/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs @@ -690,6 +690,28 @@ namespace OpenTelemetry.Metrics.Tests Assert.Equal(name, metric.Name); } + [Theory] + [InlineData(false)] + [InlineData(true)] + public void SetupSdkProviderWithNoReader(bool hasViews) + { + // This test ensures that MeterProviderSdk can be set up without any reader + using var meter = new Meter($"{Utils.GetCurrentMethodName()}.{hasViews}"); + var meterProviderBuilder = Sdk.CreateMeterProviderBuilder() + .AddMeter(meter.Name); + + if (hasViews) + { + meterProviderBuilder.AddView("counter", "renamedCounter"); + } + + using var meterProvider = meterProviderBuilder.Build(); + + var counter = meter.CreateCounter("counter"); + + counter.Add(10, new KeyValuePair("key", "value")); + } + private static long GetLongSum(List metrics) { long sum = 0; diff --git a/test/OpenTelemetry.Tests/Metrics/MultipleReadersTests.cs b/test/OpenTelemetry.Tests/Metrics/MultipleReadersTests.cs new file mode 100644 index 000000000..ec6f250af --- /dev/null +++ b/test/OpenTelemetry.Tests/Metrics/MultipleReadersTests.cs @@ -0,0 +1,119 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +using System; +using System.Collections.Generic; +using System.Diagnostics.Metrics; +using OpenTelemetry.Exporter; +using OpenTelemetry.Tests; +using Xunit; + +namespace OpenTelemetry.Metrics.Tests +{ + public class MultipleReadersTests + { + [Theory] + [InlineData(AggregationTemporality.Delta, false)] + [InlineData(AggregationTemporality.Delta, true)] + [InlineData(AggregationTemporality.Cumulative, false)] + [InlineData(AggregationTemporality.Cumulative, true)] + public void SdkSupportsMultipleReaders(AggregationTemporality aggregationTemporality, bool hasViews) + { + var exporterdMetricItems1 = new List(); + using var deltaMetricExporter1 = new InMemoryExporter(exporterdMetricItems1); + using var deltaMetricReader1 = new BaseExportingMetricReader(deltaMetricExporter1) + { + PreferredAggregationTemporality = AggregationTemporality.Delta, + }; + + var exporterdMetricItems2 = new List(); + using var deltaMetricExporter2 = new InMemoryExporter(exporterdMetricItems2); + using var deltaMetricReader2 = new BaseExportingMetricReader(deltaMetricExporter2) + { + PreferredAggregationTemporality = aggregationTemporality, + }; + using var meter = new Meter($"{Utils.GetCurrentMethodName()}.{aggregationTemporality}.{hasViews}"); + + var counter = meter.CreateCounter("counter"); + + int index = 0; + var values = new long[] { 100, 200, 300, 400 }; + long GetValue() => values[index++]; + var gauge = meter.CreateObservableGauge("gauge", () => GetValue()); + + var meterProviderBuilder = Sdk.CreateMeterProviderBuilder() + .AddMeter(meter.Name) + .AddReader(deltaMetricReader1) + .AddReader(deltaMetricReader2); + + if (hasViews) + { + meterProviderBuilder.AddView("counter", "renamedCounter"); + } + + using var meterProvider = meterProviderBuilder.Build(); + + counter.Add(10, new KeyValuePair("key", "value")); + + meterProvider.ForceFlush(); + + Assert.Equal(2, exporterdMetricItems1.Count); + Assert.Equal(2, exporterdMetricItems2.Count); + + // Check value exported for Counter + this.AssertLongSumValueForMetric(exporterdMetricItems1[0], 10); + this.AssertLongSumValueForMetric(exporterdMetricItems2[0], 10); + + // Check value exported for Gauge + this.AssertLongSumValueForMetric(exporterdMetricItems1[1], 100); + this.AssertLongSumValueForMetric(exporterdMetricItems2[1], 200); + + exporterdMetricItems1.Clear(); + exporterdMetricItems2.Clear(); + + counter.Add(15, new KeyValuePair("key", "value")); + + meterProvider.ForceFlush(); + + Assert.Equal(2, exporterdMetricItems1.Count); + Assert.Equal(2, exporterdMetricItems2.Count); + + // Check value exported for Counter + this.AssertLongSumValueForMetric(exporterdMetricItems1[0], 15); + if (aggregationTemporality == AggregationTemporality.Delta) + { + this.AssertLongSumValueForMetric(exporterdMetricItems2[0], 15); + } + else + { + this.AssertLongSumValueForMetric(exporterdMetricItems2[0], 25); + } + + // Check value exported for Gauge + this.AssertLongSumValueForMetric(exporterdMetricItems1[1], 300); + this.AssertLongSumValueForMetric(exporterdMetricItems2[1], 400); + } + + private void AssertLongSumValueForMetric(Metric metric, long value) + { + using var metricPoints = metric.GetMetricPoints(); + var metricPointsEnumerator = metricPoints.GetEnumerator(); + Assert.True(metricPointsEnumerator.MoveNext()); // One MetricPoint is emitted for the Metric + ref var metricPointForFirstExport = ref metricPointsEnumerator.Current; + Assert.Equal(value, metricPointForFirstExport.LongValue); + } + } +}