Add support for multiple readers (#2596)

This commit is contained in:
Utkarsh Umesan Pillai 2021-11-19 07:49:47 -08:00 committed by GitHub
parent 03313e7b02
commit ca291422ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 726 additions and 250 deletions

View File

@ -121,7 +121,6 @@ static OpenTelemetry.Metrics.MetricTypeExtensions.IsGauge(this OpenTelemetry.Met
static OpenTelemetry.Metrics.MetricTypeExtensions.IsHistogram(this OpenTelemetry.Metrics.MetricType self) -> bool static OpenTelemetry.Metrics.MetricTypeExtensions.IsHistogram(this OpenTelemetry.Metrics.MetricType self) -> bool
static OpenTelemetry.Metrics.MetricTypeExtensions.IsLong(this OpenTelemetry.Metrics.MetricType self) -> bool static OpenTelemetry.Metrics.MetricTypeExtensions.IsLong(this OpenTelemetry.Metrics.MetricType self) -> bool
static OpenTelemetry.Metrics.MetricTypeExtensions.IsSum(this OpenTelemetry.Metrics.MetricType self) -> bool static OpenTelemetry.Metrics.MetricTypeExtensions.IsSum(this OpenTelemetry.Metrics.MetricType self) -> bool
static OpenTelemetry.ProviderExtensions.GetMetricCollect(this OpenTelemetry.BaseProvider baseProvider) -> System.Func<OpenTelemetry.Batch<OpenTelemetry.Metrics.Metric>>
static OpenTelemetry.Sdk.CreateMeterProviderBuilder() -> OpenTelemetry.Metrics.MeterProviderBuilder static OpenTelemetry.Sdk.CreateMeterProviderBuilder() -> OpenTelemetry.Metrics.MeterProviderBuilder
static readonly OpenTelemetry.Metrics.MetricStreamConfiguration.Drop -> OpenTelemetry.Metrics.MetricStreamConfiguration static readonly OpenTelemetry.Metrics.MetricStreamConfiguration.Drop -> OpenTelemetry.Metrics.MetricStreamConfiguration
virtual OpenTelemetry.BaseExporter<T>.OnForceFlush(int timeoutMilliseconds) -> bool virtual OpenTelemetry.BaseExporter<T>.OnForceFlush(int timeoutMilliseconds) -> bool

View File

@ -121,7 +121,6 @@ static OpenTelemetry.Metrics.MetricTypeExtensions.IsGauge(this OpenTelemetry.Met
static OpenTelemetry.Metrics.MetricTypeExtensions.IsHistogram(this OpenTelemetry.Metrics.MetricType self) -> bool static OpenTelemetry.Metrics.MetricTypeExtensions.IsHistogram(this OpenTelemetry.Metrics.MetricType self) -> bool
static OpenTelemetry.Metrics.MetricTypeExtensions.IsLong(this OpenTelemetry.Metrics.MetricType self) -> bool static OpenTelemetry.Metrics.MetricTypeExtensions.IsLong(this OpenTelemetry.Metrics.MetricType self) -> bool
static OpenTelemetry.Metrics.MetricTypeExtensions.IsSum(this OpenTelemetry.Metrics.MetricType self) -> bool static OpenTelemetry.Metrics.MetricTypeExtensions.IsSum(this OpenTelemetry.Metrics.MetricType self) -> bool
static OpenTelemetry.ProviderExtensions.GetMetricCollect(this OpenTelemetry.BaseProvider baseProvider) -> System.Func<OpenTelemetry.Batch<OpenTelemetry.Metrics.Metric>>
static OpenTelemetry.Sdk.CreateMeterProviderBuilder() -> OpenTelemetry.Metrics.MeterProviderBuilder static OpenTelemetry.Sdk.CreateMeterProviderBuilder() -> OpenTelemetry.Metrics.MeterProviderBuilder
static readonly OpenTelemetry.Metrics.MetricStreamConfiguration.Drop -> OpenTelemetry.Metrics.MetricStreamConfiguration static readonly OpenTelemetry.Metrics.MetricStreamConfiguration.Drop -> OpenTelemetry.Metrics.MetricStreamConfiguration
virtual OpenTelemetry.BaseExporter<T>.OnForceFlush(int timeoutMilliseconds) -> bool virtual OpenTelemetry.BaseExporter<T>.OnForceFlush(int timeoutMilliseconds) -> bool

View File

@ -26,30 +26,33 @@
([#2542](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2542)) ([#2542](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2542))
* Added wildcard support for AddMeter. * Added wildcard support for AddMeter.
([#2459](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2459)) ([#2459](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2459))
* Add support for multiple Metric readers
([#2596](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2596))
## 1.2.0-beta1 ## 1.2.0-beta1
Released 2021-Oct-08 Released 2021-Oct-08
* Exception from Observable instrument callbacks does not * Exception from Observable instrument callbacks does not result in entire
result in entire metrics being lost. metrics being lost.
* SDK is allocation-free on recording of measurements with * SDK is allocation-free on recording of measurements with upto 8 tags.
upto 8 tags.
* TracerProviderBuilder.AddLegacySource now supports wildcard activity names. * TracerProviderBuilder.AddLegacySource now supports wildcard activity names.
([#2183](https://github.com/open-telemetry/opentelemetry-dotnet/issues/2183)) ([#2183](https://github.com/open-telemetry/opentelemetry-dotnet/issues/2183))
* Instrument and View names are validated * Instrument and View names are validated [according with the
[according with the spec](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#instrument). spec](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#instrument).
([#2470](https://github.com/open-telemetry/opentelemetry-dotnet/issues/2470)) ([#2470](https://github.com/open-telemetry/opentelemetry-dotnet/issues/2470))
## 1.2.0-alpha4 ## 1.2.0-alpha4
Released 2021-Sep-23 Released 2021-Sep-23
* `BatchExportProcessor.OnShutdown` will now log the count of dropped telemetry items. * `BatchExportProcessor.OnShutdown` will now log the count of dropped telemetry
items.
([#2331](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2331)) ([#2331](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2331))
* Changed `CompositeProcessor<T>.OnForceFlush` to meet with the spec * Changed `CompositeProcessor<T>.OnForceFlush` to meet with the spec
requirement. Now the SDK will invoke `ForceFlush` on all registered requirement. Now the SDK will invoke `ForceFlush` on all registered
@ -60,14 +63,14 @@ Released 2021-Sep-23
Released 2021-Sep-13 Released 2021-Sep-13
* Metrics perf improvements, bug fixes. * Metrics perf improvements, bug fixes. Replace MetricProcessor with
Replace MetricProcessor with MetricReader. MetricReader.
([#2306](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2306)) ([#2306](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2306))
* Add `BatchExportActivityProcessorOptions` which supports field value overriding * Add `BatchExportActivityProcessorOptions` which supports field value
using `OTEL_BSP_SCHEDULE_DELAY`, `OTEL_BSP_EXPORT_TIMEOUT`, overriding using `OTEL_BSP_SCHEDULE_DELAY`, `OTEL_BSP_EXPORT_TIMEOUT`,
`OTEL_BSP_MAX_QUEUE_SIZE`, `OTEL_BSP_MAX_EXPORT_BATCH_SIZE` `OTEL_BSP_MAX_QUEUE_SIZE`, `OTEL_BSP_MAX_EXPORT_BATCH_SIZE` envionmental
envionmental variables as defined in the variables as defined in the
[specification](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.5.0/specification/sdk-environment-variables.md#batch-span-processor). [specification](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.5.0/specification/sdk-environment-variables.md#batch-span-processor).
([#2219](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2219)) ([#2219](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2219))
@ -75,21 +78,24 @@ Released 2021-Sep-13
Released 2021-Aug-24 Released 2021-Aug-24
* More Metrics features. All instrument types, push/pull * More Metrics features. All instrument types, push/pull exporters,
exporters, Delta/Cumulative temporality supported. Delta/Cumulative temporality supported.
* `ResourceBuilder.CreateDefault` has detectors for * `ResourceBuilder.CreateDefault` has detectors for `OTEL_RESOURCE_ATTRIBUTES`,
`OTEL_RESOURCE_ATTRIBUTES`, `OTEL_SERVICE_NAME` environment variables `OTEL_SERVICE_NAME` environment variables so that explicit
so that explicit `AddEnvironmentVariableDetector` call is not needed. ([#2247](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2247)) `AddEnvironmentVariableDetector` call is not needed.
([#2247](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2247))
* `ResourceBuilder.AddEnvironmentVariableDetector` handles `OTEL_SERVICE_NAME` * `ResourceBuilder.AddEnvironmentVariableDetector` handles `OTEL_SERVICE_NAME`
environmental variable. ([#2209](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2209)) environmental variable.
([#2209](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2209))
* Removes upper constraint for Microsoft.Extensions.Logging * Removes upper constraint for Microsoft.Extensions.Logging dependencies.
dependencies. ([#2179](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2179)) ([#2179](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2179))
* OpenTelemetryLogger modified to not throw, when the * OpenTelemetryLogger modified to not throw, when the formatter supplied in
formatter supplied in ILogger.Log call is null. ([#2200](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2200)) ILogger.Log call is null.
([#2200](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2200))
## 1.2.0-alpha1 ## 1.2.0-alpha1
@ -100,7 +106,8 @@ Released 2021-Jul-23
([#2174](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2174)) ([#2174](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2174))
* Removes .NET Framework 4.5.2, .NET 4.6 support. The minimum .NET Framework * Removes .NET Framework 4.5.2, .NET 4.6 support. The minimum .NET Framework
version supported is .NET 4.6.1. ([#2138](https://github.com/open-telemetry/opentelemetry-dotnet/issues/2138)) version supported is .NET 4.6.1.
([#2138](https://github.com/open-telemetry/opentelemetry-dotnet/issues/2138))
## 1.1.0 ## 1.1.0
@ -137,8 +144,8 @@ Released 2021-May-11
Released 2021-Apr-23 Released 2021-Apr-23
* Use `AssemblyFileVersionAttribute` instead of `FileVersionInfo.GetVersionInfo` * Use `AssemblyFileVersionAttribute` instead of `FileVersionInfo.GetVersionInfo`
to get the SDK version attribute to ensure that it works when the assembly to get the SDK version attribute to ensure that it works when the assembly is
is not loaded directly from a file on disk not loaded directly from a file on disk
([#1908](https://github.com/open-telemetry/opentelemetry-dotnet/issues/1908)) ([#1908](https://github.com/open-telemetry/opentelemetry-dotnet/issues/1908))
## 1.1.0-beta1 ## 1.1.0-beta1

View File

@ -22,11 +22,15 @@ using OpenTelemetry.Internal;
namespace OpenTelemetry.Metrics namespace OpenTelemetry.Metrics
{ {
internal sealed class CompositeMetricReader : MetricReader /// <summary>
/// CompositeMetricReader that does not deal with adding metrics and recording measurements.
/// </summary>
internal sealed partial class CompositeMetricReader : MetricReader
{ {
private readonly DoublyLinkedListNode head; private readonly DoublyLinkedListNode head;
private DoublyLinkedListNode tail; private DoublyLinkedListNode tail;
private bool disposed; private bool disposed;
private int count;
public CompositeMetricReader(IEnumerable<MetricReader> readers) public CompositeMetricReader(IEnumerable<MetricReader> readers)
{ {
@ -40,6 +44,7 @@ namespace OpenTelemetry.Metrics
this.head = new DoublyLinkedListNode(iter.Current); this.head = new DoublyLinkedListNode(iter.Current);
this.tail = this.head; this.tail = this.head;
this.count++;
while (iter.MoveNext()) while (iter.MoveNext())
{ {
@ -57,6 +62,7 @@ namespace OpenTelemetry.Metrics
}; };
this.tail.Next = node; this.tail.Next = node;
this.tail = node; this.tail = node;
this.count++;
return this; return this;
} }

View File

@ -0,0 +1,149 @@
// <copyright file="CompositeMetricReaderExt.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Metrics;
namespace OpenTelemetry.Metrics
{
/// <summary>
/// CompositeMetricReader that deals with adding metrics and recording measurements.
/// </summary>
internal sealed partial class CompositeMetricReader
{
internal List<Metric> AddMetricsWithNoViews(Instrument instrument)
{
var metrics = new List<Metric>(this.count);
for (var cur = this.head; cur != null; cur = cur.Next)
{
var metric = cur.Value.AddMetricWithNoViews(instrument);
metrics.Add(metric);
}
return metrics;
}
internal void RecordSingleStreamLongMeasurements(List<Metric> metrics, long value, ReadOnlySpan<KeyValuePair<string, object>> tags)
{
Debug.Assert(metrics.Count == this.count, "The count of metrics to be updated for a CompositeReader must match the number of individual readers.");
int index = 0;
for (var cur = this.head; cur != null; cur = cur.Next)
{
if (metrics[index] != null)
{
cur.Value.RecordSingleStreamLongMeasurement(metrics[index], value, tags);
}
index++;
}
}
internal void RecordSingleStreamDoubleMeasurements(List<Metric> metrics, double value, ReadOnlySpan<KeyValuePair<string, object>> tags)
{
Debug.Assert(metrics.Count == this.count, "The count of metrics to be updated for a CompositeReader must match the number of individual readers.");
int index = 0;
for (var cur = this.head; cur != null; cur = cur.Next)
{
if (metrics[index] != null)
{
cur.Value.RecordSingleStreamDoubleMeasurement(metrics[index], value, tags);
}
index++;
}
}
internal List<List<Metric>> AddMetricsSuperListWithViews(Instrument instrument, List<MetricStreamConfiguration> metricStreamConfigs)
{
var metricsSuperList = new List<List<Metric>>(this.count);
for (var cur = this.head; cur != null; cur = cur.Next)
{
var metrics = cur.Value.AddMetricsListWithViews(instrument, metricStreamConfigs);
metricsSuperList.Add(metrics);
}
return metricsSuperList;
}
internal void RecordLongMeasurements(List<List<Metric>> metricsSuperList, long value, ReadOnlySpan<KeyValuePair<string, object>> tags)
{
Debug.Assert(metricsSuperList.Count == this.count, "The count of metrics to be updated for a CompositeReader must match the number of individual readers.");
int index = 0;
for (var cur = this.head; cur != null; cur = cur.Next)
{
if (metricsSuperList[index].Count > 0)
{
cur.Value.RecordLongMeasurement(metricsSuperList[index], value, tags);
}
index++;
}
}
internal void RecordDoubleMeasurements(List<List<Metric>> metricsSuperList, double value, ReadOnlySpan<KeyValuePair<string, object>> tags)
{
Debug.Assert(metricsSuperList.Count == this.count, "The count of metrics to be updated for a CompositeReader must match the number of individual readers.");
int index = 0;
for (var cur = this.head; cur != null; cur = cur.Next)
{
if (metricsSuperList[index].Count > 0)
{
cur.Value.RecordDoubleMeasurement(metricsSuperList[index], value, tags);
}
index++;
}
}
internal void CompleteSingleStreamMeasurements(List<Metric> metrics)
{
Debug.Assert(metrics.Count == this.count, "The count of metrics to be updated for a CompositeReader must match the number of individual readers.");
int index = 0;
for (var cur = this.head; cur != null; cur = cur.Next)
{
if (metrics[index] != null)
{
cur.Value.CompleteSingleStreamMeasurement(metrics[index]);
}
index++;
}
}
internal void CompleteMesaurements(List<List<Metric>> metricsSuperList)
{
Debug.Assert(metricsSuperList.Count == this.count, "The count of metrics to be updated for a CompositeReader must match the number of individual readers.");
int index = 0;
for (var cur = this.head; cur != null; cur = cur.Next)
{
if (metricsSuperList[index].Count > 0)
{
cur.Value.CompleteMeasurement(metricsSuperList[index]);
}
index++;
}
}
}
}

View File

@ -71,11 +71,6 @@ namespace OpenTelemetry.Metrics
internal MeterProviderBuilder AddReader(MetricReader reader) internal MeterProviderBuilder AddReader(MetricReader reader)
{ {
if (this.MetricReaders.Count >= 1)
{
throw new InvalidOperationException("Only one Metricreader is allowed.");
}
this.MetricReaders.Add(reader); this.MetricReaders.Add(reader);
return this; return this;
} }

View File

@ -27,18 +27,13 @@ namespace OpenTelemetry.Metrics
{ {
internal sealed class MeterProviderSdk : MeterProvider internal sealed class MeterProviderSdk : MeterProvider
{ {
internal const int MaxMetrics = 1000;
internal int ShutdownCount; internal int ShutdownCount;
private readonly Metric[] metrics;
private readonly Metric[] metricsCurrentBatch;
private readonly List<object> instrumentations = new List<object>(); private readonly List<object> instrumentations = new List<object>();
private readonly List<Func<Instrument, MetricStreamConfiguration>> viewConfigs; private readonly List<Func<Instrument, MetricStreamConfiguration>> viewConfigs;
private readonly object collectLock = new object(); private readonly object collectLock = new object();
private readonly object instrumentCreationLock = new object();
private readonly HashSet<string> metricStreamNames = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
private readonly MeterListener listener; private readonly MeterListener listener;
private readonly MetricReader reader; private readonly MetricReader reader;
private int metricIndex = -1; private readonly CompositeMetricReader compositeMetricReader;
private bool disposed; private bool disposed;
internal MeterProviderSdk( internal MeterProviderSdk(
@ -50,10 +45,6 @@ namespace OpenTelemetry.Metrics
{ {
this.Resource = resource; this.Resource = resource;
this.viewConfigs = viewConfigs; this.viewConfigs = viewConfigs;
this.metrics = new Metric[MaxMetrics];
this.metricsCurrentBatch = new Metric[MaxMetrics];
AggregationTemporality temporality = AggregationTemporality.Cumulative;
foreach (var reader in readers) foreach (var reader in readers)
{ {
@ -61,10 +52,6 @@ namespace OpenTelemetry.Metrics
reader.SetParentProvider(this); reader.SetParentProvider(this);
// TODO: Actually support multiple readers.
// Currently the last reader's temporality wins.
temporality = reader.PreferredAggregationTemporality;
if (this.reader == null) if (this.reader == null)
{ {
this.reader = reader; this.reader = reader;
@ -79,6 +66,8 @@ namespace OpenTelemetry.Metrics
} }
} }
this.compositeMetricReader = this.reader as CompositeMetricReader;
if (instrumentationFactories.Any()) if (instrumentationFactories.Any())
{ {
foreach (var instrumentationFactory in instrumentationFactories) foreach (var instrumentationFactory in instrumentationFactories)
@ -107,6 +96,9 @@ namespace OpenTelemetry.Metrics
this.listener = new MeterListener(); this.listener = new MeterListener();
var viewConfigCount = this.viewConfigs.Count; var viewConfigCount = this.viewConfigs.Count;
// We expect that all the readers to be added are provided before MeterProviderSdk is built.
// If there are no readers added, we do not enable measurements for the instruments.
if (viewConfigCount > 0) if (viewConfigCount > 0)
{ {
this.listener.InstrumentPublished = (instrument, listener) => this.listener.InstrumentPublished = (instrument, listener) =>
@ -141,75 +133,25 @@ namespace OpenTelemetry.Metrics
metricStreamConfigs.Add(null); metricStreamConfigs.Add(null);
} }
var maxCountMetricsToBeCreated = metricStreamConfigs.Count; if (this.reader != null)
// Create list with initial capacity as the max metric count.
// Due to duplicate/max limit, we may not end up using them
// all, and that memory is wasted until Meter disposed.
// TODO: Revisit to see if we need to do metrics.TrimExcess()
var metrics = new List<Metric>(maxCountMetricsToBeCreated);
lock (this.instrumentCreationLock)
{ {
for (int i = 0; i < maxCountMetricsToBeCreated; i++) if (this.compositeMetricReader == null)
{ {
var metricStreamConfig = metricStreamConfigs[i]; var metrics = this.reader.AddMetricsListWithViews(instrument, metricStreamConfigs);
var meterName = instrument.Meter.Name;
var metricName = metricStreamConfig?.Name ?? instrument.Name;
var metricStreamName = $"{meterName}.{metricName}";
if (!MeterProviderBuilderSdk.IsValidInstrumentName(metricName))
{
OpenTelemetrySdkEventSource.Log.MetricInstrumentIgnored(
metricName,
instrument.Meter.Name,
"Metric name is invalid.",
"The name must comply with the OpenTelemetry specification.");
continue;
}
if (this.metricStreamNames.Contains(metricStreamName))
{
// TODO: Log that instrument is ignored
// as the resulting Metric name is conflicting
// with existing name.
continue;
}
if (metricStreamConfig?.Aggregation == Aggregation.Drop)
{
// TODO: Log that instrument is ignored
// as user explicitly asked to drop it
// with View.
continue;
}
var index = ++this.metricIndex;
if (index >= MaxMetrics)
{
// TODO: Log that instrument is ignored
// as max number of Metrics have reached.
}
else
{
Metric metric;
var metricDescription = metricStreamConfig?.Description ?? instrument.Description;
string[] tagKeysInteresting = metricStreamConfig?.TagKeys;
double[] histogramBucketBounds = (metricStreamConfig is ExplicitBucketHistogramConfiguration histogramConfig
&& histogramConfig.Boundaries != null) ? histogramConfig.Boundaries : null;
metric = new Metric(instrument, temporality, metricName, metricDescription, histogramBucketBounds, tagKeysInteresting);
this.metrics[index] = metric;
metrics.Add(metric);
this.metricStreamNames.Add(metricStreamName);
}
}
if (metrics.Count > 0) if (metrics.Count > 0)
{ {
listener.EnableMeasurementEvents(instrument, metrics); listener.EnableMeasurementEvents(instrument, metrics);
} }
} }
else
{
var metricsSuperList = this.compositeMetricReader.AddMetricsSuperListWithViews(instrument, metricStreamConfigs);
if (metricsSuperList.Any(metrics => metrics.Count > 0))
{
listener.EnableMeasurementEvents(instrument, metricsSuperList);
}
}
}
}; };
// Everything double // Everything double
@ -247,33 +189,25 @@ namespace OpenTelemetry.Metrics
return; return;
} }
var meterName = instrument.Meter.Name; if (this.reader != null)
var metricName = instrument.Name;
var metricStreamName = $"{meterName}.{metricName}";
Metric metric = null;
lock (this.instrumentCreationLock)
{ {
if (this.metricStreamNames.Contains(metricStreamName)) if (this.compositeMetricReader == null)
{ {
OpenTelemetrySdkEventSource.Log.MetricInstrumentIgnored(metricName, instrument.Meter.Name, "Metric name conflicting with existing name.", "Either change the name of the instrument or change name using View."); var metric = this.reader.AddMetricWithNoViews(instrument);
return; if (metric != null)
{
listener.EnableMeasurementEvents(instrument, metric);
} }
var index = ++this.metricIndex;
if (index >= MaxMetrics)
{
OpenTelemetrySdkEventSource.Log.MetricInstrumentIgnored(metricName, instrument.Meter.Name, "Maximum allowed Metrics for the provider exceeded.", "Use views to drop unused instruments. Or configure Provider to allow higher limit.");
return;
} }
else else
{ {
metric = new Metric(instrument, temporality, metricName, instrument.Description); var metrics = this.compositeMetricReader.AddMetricsWithNoViews(instrument);
this.metrics[index] = metric; if (metrics.Any(metric => metric != null))
this.metricStreamNames.Add(metricStreamName); {
listener.EnableMeasurementEvents(instrument, metrics);
}
} }
} }
listener.EnableMeasurementEvents(instrument, metric);
} }
catch (Exception) catch (Exception)
{ {
@ -311,122 +245,163 @@ namespace OpenTelemetry.Metrics
internal void MeasurementsCompletedSingleStream(Instrument instrument, object state) internal void MeasurementsCompletedSingleStream(Instrument instrument, object state)
{ {
var metric = state as Metric; Debug.Assert(instrument != null, "instrument must be non-null.");
if (metric == null)
if (this.compositeMetricReader == null)
{
if (state is not Metric metric)
{ {
// TODO: log // TODO: log
return; return;
} }
metric.InstrumentDisposed = true; this.reader.CompleteSingleStreamMeasurement(metric);
}
else
{
if (state is not List<Metric> metrics)
{
// TODO: log
return;
}
this.compositeMetricReader.CompleteSingleStreamMeasurements(metrics);
}
} }
internal void MeasurementsCompleted(Instrument instrument, object state) internal void MeasurementsCompleted(Instrument instrument, object state)
{ {
var metrics = state as List<Metric>; Debug.Assert(instrument != null, "instrument must be non-null.");
if (metrics == null)
if (this.compositeMetricReader == null)
{
if (state is not List<Metric> metrics)
{ {
// TODO: log // TODO: log
return; return;
} }
foreach (var metric in metrics) this.reader.CompleteMeasurement(metrics);
}
else
{ {
metric.InstrumentDisposed = true; if (state is not List<List<Metric>> metricsSuperList)
{
// TODO: log
return;
}
this.compositeMetricReader.CompleteMesaurements(metricsSuperList);
} }
} }
internal void MeasurementRecordedDouble(Instrument instrument, double value, ReadOnlySpan<KeyValuePair<string, object>> tagsRos, object state) internal void MeasurementRecordedDouble(Instrument instrument, double value, ReadOnlySpan<KeyValuePair<string, object>> tagsRos, object state)
{ {
// Get Instrument State
var metrics = state as List<Metric>;
Debug.Assert(instrument != null, "instrument must be non-null."); Debug.Assert(instrument != null, "instrument must be non-null.");
if (metrics == null)
if (this.compositeMetricReader == null)
{
if (state is not List<Metric> metrics)
{ {
// TODO: log // TODO: log
return; return;
} }
if (metrics.Count == 1) this.reader.RecordDoubleMeasurement(metrics, value, tagsRos);
{
// special casing the common path
// as this is faster than the
// foreach, when count is 1.
metrics[0].UpdateDouble(value, tagsRos);
} }
else else
{ {
foreach (var metric in metrics) if (state is not List<List<Metric>> metricsSuperList)
{ {
metric.UpdateDouble(value, tagsRos); // TODO: log
return;
} }
this.compositeMetricReader.RecordDoubleMeasurements(metricsSuperList, value, tagsRos);
} }
} }
internal void MeasurementRecordedLong(Instrument instrument, long value, ReadOnlySpan<KeyValuePair<string, object>> tagsRos, object state) internal void MeasurementRecordedLong(Instrument instrument, long value, ReadOnlySpan<KeyValuePair<string, object>> tagsRos, object state)
{ {
// Get Instrument State
var metrics = state as List<Metric>;
Debug.Assert(instrument != null, "instrument must be non-null."); Debug.Assert(instrument != null, "instrument must be non-null.");
if (metrics == null)
if (this.compositeMetricReader == null)
{
if (state is not List<Metric> metrics)
{ {
// TODO: log // TODO: log
return; return;
} }
if (metrics.Count == 1) this.reader.RecordLongMeasurement(metrics, value, tagsRos);
{
// special casing the common path
// as this is faster than the
// foreach, when count is 1.
metrics[0].UpdateLong(value, tagsRos);
} }
else else
{ {
foreach (var metric in metrics) if (state is not List<List<Metric>> metricsSuperList)
{ {
metric.UpdateLong(value, tagsRos); // TODO: log
return;
} }
this.compositeMetricReader.RecordLongMeasurements(metricsSuperList, value, tagsRos);
} }
} }
internal void MeasurementRecordedLongSingleStream(Instrument instrument, long value, ReadOnlySpan<KeyValuePair<string, object>> tagsRos, object state) internal void MeasurementRecordedLongSingleStream(Instrument instrument, long value, ReadOnlySpan<KeyValuePair<string, object>> tagsRos, object state)
{ {
// Get Instrument State
var metric = state as Metric;
Debug.Assert(instrument != null, "instrument must be non-null."); Debug.Assert(instrument != null, "instrument must be non-null.");
if (metric == null)
if (this.compositeMetricReader == null)
{
if (state is not Metric metric)
{ {
// TODO: log // TODO: log
return; return;
} }
metric.UpdateLong(value, tagsRos); this.reader.RecordSingleStreamLongMeasurement(metric, value, tagsRos);
}
else
{
if (state is not List<Metric> metrics)
{
// TODO: log
return;
}
this.compositeMetricReader.RecordSingleStreamLongMeasurements(metrics, value, tagsRos);
}
} }
internal void MeasurementRecordedDoubleSingleStream(Instrument instrument, double value, ReadOnlySpan<KeyValuePair<string, object>> tagsRos, object state) internal void MeasurementRecordedDoubleSingleStream(Instrument instrument, double value, ReadOnlySpan<KeyValuePair<string, object>> tagsRos, object state)
{ {
// Get Instrument State
var metric = state as Metric;
Debug.Assert(instrument != null, "instrument must be non-null."); Debug.Assert(instrument != null, "instrument must be non-null.");
if (metric == null)
if (this.compositeMetricReader == null)
{
if (state is not Metric metric)
{ {
// TODO: log // TODO: log
return; return;
} }
metric.UpdateDouble(value, tagsRos); this.reader.RecordSingleStreamDoubleMeasurement(metric, value, tagsRos);
}
else
{
if (state is not List<Metric> metrics)
{
// TODO: log
return;
} }
internal Batch<Metric> Collect() this.compositeMetricReader.RecordSingleStreamDoubleMeasurements(metrics, value, tagsRos);
}
}
internal void CollectObservableInstruments()
{ {
lock (this.collectLock) lock (this.collectLock)
{
try
{ {
// Record all observable instruments // Record all observable instruments
try try
@ -440,40 +415,6 @@ namespace OpenTelemetry.Metrics
// threw. // threw.
OpenTelemetrySdkEventSource.Log.MetricObserverCallbackException(exception); OpenTelemetrySdkEventSource.Log.MetricObserverCallbackException(exception);
} }
var indexSnapshot = Math.Min(this.metricIndex, MaxMetrics - 1);
var target = indexSnapshot + 1;
int metricCountCurrentBatch = 0;
for (int i = 0; i < target; i++)
{
var metric = this.metrics[i];
int metricPointSize = 0;
if (metric != null)
{
if (metric.InstrumentDisposed)
{
metricPointSize = metric.Snapshot();
this.metrics[i] = null;
}
else
{
metricPointSize = metric.Snapshot();
}
if (metricPointSize > 0)
{
this.metricsCurrentBatch[metricCountCurrentBatch++] = metric;
}
}
}
return (metricCountCurrentBatch > 0) ? new Batch<Metric>(this.metricsCurrentBatch, metricCountCurrentBatch) : default;
}
catch (Exception)
{
// TODO: Log
return default;
}
} }
} }
@ -538,6 +479,7 @@ namespace OpenTelemetry.Metrics
// Wait for up to 5 seconds grace period // Wait for up to 5 seconds grace period
this.reader?.Shutdown(5000); this.reader?.Shutdown(5000);
this.reader?.Dispose(); this.reader?.Dispose();
this.compositeMetricReader?.Dispose();
this.listener.Dispose(); this.listener.Dispose();
} }

View File

@ -22,7 +22,10 @@ using OpenTelemetry.Internal;
namespace OpenTelemetry.Metrics namespace OpenTelemetry.Metrics
{ {
public abstract class MetricReader : IDisposable /// <summary>
/// MetricReader which does not deal with individual metrics.
/// </summary>
public abstract partial class MetricReader : IDisposable
{ {
private const AggregationTemporality CumulativeAndDelta = AggregationTemporality.Cumulative | AggregationTemporality.Delta; private const AggregationTemporality CumulativeAndDelta = AggregationTemporality.Cumulative | AggregationTemporality.Delta;
private readonly object newTaskLock = new object(); private readonly object newTaskLock = new object();
@ -211,8 +214,10 @@ namespace OpenTelemetry.Metrics
? null ? null
: Stopwatch.StartNew(); : Stopwatch.StartNew();
var collectMetric = this.ParentProvider.GetMetricCollect(); var collectObservableInstruments = this.ParentProvider.GetObservableInstrumentCollectCallback();
var metrics = collectMetric(); collectObservableInstruments();
var metrics = this.GetMetricsBatch();
if (sw == null) if (sw == null)
{ {

View File

@ -0,0 +1,233 @@
// <copyright file="MetricReaderExt.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System;
using System.Collections.Generic;
using System.Diagnostics.Metrics;
using OpenTelemetry.Internal;
namespace OpenTelemetry.Metrics
{
/// <summary>
/// MetricReader which processes individual metrics.
/// </summary>
public abstract partial class MetricReader
{
internal const int MaxMetrics = 1000;
private readonly Metric[] metrics = new Metric[MaxMetrics];
private readonly Metric[] metricsCurrentBatch = new Metric[MaxMetrics];
private readonly HashSet<string> metricStreamNames = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
private readonly object instrumentCreationLock = new object();
private int metricIndex = -1;
internal Metric AddMetricWithNoViews(Instrument instrument)
{
var meterName = instrument.Meter.Name;
var metricName = instrument.Name;
var metricStreamName = $"{meterName}.{metricName}";
lock (this.instrumentCreationLock)
{
if (this.metricStreamNames.Contains(metricStreamName))
{
OpenTelemetrySdkEventSource.Log.MetricInstrumentIgnored(metricName, instrument.Meter.Name, "Metric name conflicting with existing name.", "Either change the name of the instrument or change name using View.");
return null;
}
var index = ++this.metricIndex;
if (index >= MaxMetrics)
{
OpenTelemetrySdkEventSource.Log.MetricInstrumentIgnored(metricName, instrument.Meter.Name, "Maximum allowed Metrics for the provider exceeded.", "Use views to drop unused instruments. Or configure Provider to allow higher limit.");
return null;
}
else
{
var metric = new Metric(instrument, this.preferredAggregationTemporality, metricName, instrument.Description);
this.metrics[index] = metric;
this.metricStreamNames.Add(metricStreamName);
return metric;
}
}
}
internal void RecordSingleStreamLongMeasurement(Metric metric, long value, ReadOnlySpan<KeyValuePair<string, object>> tags)
{
metric.UpdateLong(value, tags);
}
internal void RecordSingleStreamDoubleMeasurement(Metric metric, double value, ReadOnlySpan<KeyValuePair<string, object>> tags)
{
metric.UpdateDouble(value, tags);
}
internal List<Metric> AddMetricsListWithViews(Instrument instrument, List<MetricStreamConfiguration> metricStreamConfigs)
{
var maxCountMetricsToBeCreated = metricStreamConfigs.Count;
// Create list with initial capacity as the max metric count.
// Due to duplicate/max limit, we may not end up using them
// all, and that memory is wasted until Meter disposed.
// TODO: Revisit to see if we need to do metrics.TrimExcess()
var metrics = new List<Metric>(maxCountMetricsToBeCreated);
lock (this.instrumentCreationLock)
{
for (int i = 0; i < maxCountMetricsToBeCreated; i++)
{
var metricStreamConfig = metricStreamConfigs[i];
var meterName = instrument.Meter.Name;
var metricName = metricStreamConfig?.Name ?? instrument.Name;
var metricStreamName = $"{meterName}.{metricName}";
if (!MeterProviderBuilderSdk.IsValidInstrumentName(metricName))
{
OpenTelemetrySdkEventSource.Log.MetricInstrumentIgnored(
metricName,
instrument.Meter.Name,
"Metric name is invalid.",
"The name must comply with the OpenTelemetry specification.");
continue;
}
if (this.metricStreamNames.Contains(metricStreamName))
{
// TODO: Log that instrument is ignored
// as the resulting Metric name is conflicting
// with existing name.
continue;
}
if (metricStreamConfig?.Aggregation == Aggregation.Drop)
{
// TODO: Log that instrument is ignored
// as user explicitly asked to drop it
// with View.
continue;
}
var index = ++this.metricIndex;
if (index >= MaxMetrics)
{
// TODO: Log that instrument is ignored
// as max number of Metrics have reached.
}
else
{
Metric metric;
var metricDescription = metricStreamConfig?.Description ?? instrument.Description;
string[] tagKeysInteresting = metricStreamConfig?.TagKeys;
double[] histogramBucketBounds = (metricStreamConfig is ExplicitBucketHistogramConfiguration histogramConfig
&& histogramConfig.Boundaries != null) ? histogramConfig.Boundaries : null;
metric = new Metric(instrument, this.preferredAggregationTemporality, metricName, metricDescription, histogramBucketBounds, tagKeysInteresting);
this.metrics[index] = metric;
metrics.Add(metric);
this.metricStreamNames.Add(metricStreamName);
}
}
return metrics;
}
}
internal void RecordLongMeasurement(List<Metric> metrics, long value, ReadOnlySpan<KeyValuePair<string, object>> tags)
{
if (metrics.Count == 1)
{
// special casing the common path
// as this is faster than the
// foreach, when count is 1.
metrics[0].UpdateLong(value, tags);
}
else
{
foreach (var metric in metrics)
{
metric.UpdateLong(value, tags);
}
}
}
internal void RecordDoubleMeasurement(List<Metric> metrics, double value, ReadOnlySpan<KeyValuePair<string, object>> tags)
{
if (metrics.Count == 1)
{
// special casing the common path
// as this is faster than the
// foreach, when count is 1.
metrics[0].UpdateDouble(value, tags);
}
else
{
foreach (var metric in metrics)
{
metric.UpdateDouble(value, tags);
}
}
}
internal void CompleteSingleStreamMeasurement(Metric metric)
{
metric.InstrumentDisposed = true;
}
internal void CompleteMeasurement(List<Metric> metrics)
{
foreach (var metric in metrics)
{
metric.InstrumentDisposed = true;
}
}
private Batch<Metric> GetMetricsBatch()
{
try
{
var indexSnapshot = Math.Min(this.metricIndex, MaxMetrics - 1);
var target = indexSnapshot + 1;
int metricCountCurrentBatch = 0;
for (int i = 0; i < target; i++)
{
var metric = this.metrics[i];
int metricPointSize = 0;
if (metric != null)
{
if (metric.InstrumentDisposed)
{
metricPointSize = metric.Snapshot();
this.metrics[i] = null;
}
else
{
metricPointSize = metric.Snapshot();
}
if (metricPointSize > 0)
{
this.metricsCurrentBatch[metricCountCurrentBatch++] = metric;
}
}
}
return (metricCountCurrentBatch > 0) ? new Batch<Metric>(this.metricsCurrentBatch, metricCountCurrentBatch) : default;
}
catch (Exception)
{
// TODO: Log
return default;
}
}
}
}

View File

@ -50,16 +50,6 @@ namespace OpenTelemetry
return Resource.Empty; return Resource.Empty;
} }
public static Func<Batch<Metric>> GetMetricCollect(this BaseProvider baseProvider)
{
if (baseProvider is MeterProviderSdk meterProviderSdk)
{
return meterProviderSdk.Collect;
}
return null;
}
/// <summary> /// <summary>
/// Gets the <see cref="Resource"/> associated with the <see cref="BaseProvider"/>. /// Gets the <see cref="Resource"/> associated with the <see cref="BaseProvider"/>.
/// </summary> /// </summary>
@ -69,5 +59,15 @@ namespace OpenTelemetry
{ {
return ResourceBuilder.CreateDefault().Build(); return ResourceBuilder.CreateDefault().Build();
} }
internal static Action GetObservableInstrumentCollectCallback(this BaseProvider baseProvider)
{
if (baseProvider is MeterProviderSdk meterProviderSdk)
{
return meterProviderSdk.CollectObservableInstruments;
}
return null;
}
} }
} }

View File

@ -690,6 +690,28 @@ namespace OpenTelemetry.Metrics.Tests
Assert.Equal(name, metric.Name); Assert.Equal(name, metric.Name);
} }
[Theory]
[InlineData(false)]
[InlineData(true)]
public void SetupSdkProviderWithNoReader(bool hasViews)
{
// This test ensures that MeterProviderSdk can be set up without any reader
using var meter = new Meter($"{Utils.GetCurrentMethodName()}.{hasViews}");
var meterProviderBuilder = Sdk.CreateMeterProviderBuilder()
.AddMeter(meter.Name);
if (hasViews)
{
meterProviderBuilder.AddView("counter", "renamedCounter");
}
using var meterProvider = meterProviderBuilder.Build();
var counter = meter.CreateCounter<long>("counter");
counter.Add(10, new KeyValuePair<string, object>("key", "value"));
}
private static long GetLongSum(List<Metric> metrics) private static long GetLongSum(List<Metric> metrics)
{ {
long sum = 0; long sum = 0;

View File

@ -0,0 +1,119 @@
// <copyright file="MultipleReadersTests.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System;
using System.Collections.Generic;
using System.Diagnostics.Metrics;
using OpenTelemetry.Exporter;
using OpenTelemetry.Tests;
using Xunit;
namespace OpenTelemetry.Metrics.Tests
{
public class MultipleReadersTests
{
[Theory]
[InlineData(AggregationTemporality.Delta, false)]
[InlineData(AggregationTemporality.Delta, true)]
[InlineData(AggregationTemporality.Cumulative, false)]
[InlineData(AggregationTemporality.Cumulative, true)]
public void SdkSupportsMultipleReaders(AggregationTemporality aggregationTemporality, bool hasViews)
{
var exporterdMetricItems1 = new List<Metric>();
using var deltaMetricExporter1 = new InMemoryExporter<Metric>(exporterdMetricItems1);
using var deltaMetricReader1 = new BaseExportingMetricReader(deltaMetricExporter1)
{
PreferredAggregationTemporality = AggregationTemporality.Delta,
};
var exporterdMetricItems2 = new List<Metric>();
using var deltaMetricExporter2 = new InMemoryExporter<Metric>(exporterdMetricItems2);
using var deltaMetricReader2 = new BaseExportingMetricReader(deltaMetricExporter2)
{
PreferredAggregationTemporality = aggregationTemporality,
};
using var meter = new Meter($"{Utils.GetCurrentMethodName()}.{aggregationTemporality}.{hasViews}");
var counter = meter.CreateCounter<long>("counter");
int index = 0;
var values = new long[] { 100, 200, 300, 400 };
long GetValue() => values[index++];
var gauge = meter.CreateObservableGauge("gauge", () => GetValue());
var meterProviderBuilder = Sdk.CreateMeterProviderBuilder()
.AddMeter(meter.Name)
.AddReader(deltaMetricReader1)
.AddReader(deltaMetricReader2);
if (hasViews)
{
meterProviderBuilder.AddView("counter", "renamedCounter");
}
using var meterProvider = meterProviderBuilder.Build();
counter.Add(10, new KeyValuePair<string, object>("key", "value"));
meterProvider.ForceFlush();
Assert.Equal(2, exporterdMetricItems1.Count);
Assert.Equal(2, exporterdMetricItems2.Count);
// Check value exported for Counter
this.AssertLongSumValueForMetric(exporterdMetricItems1[0], 10);
this.AssertLongSumValueForMetric(exporterdMetricItems2[0], 10);
// Check value exported for Gauge
this.AssertLongSumValueForMetric(exporterdMetricItems1[1], 100);
this.AssertLongSumValueForMetric(exporterdMetricItems2[1], 200);
exporterdMetricItems1.Clear();
exporterdMetricItems2.Clear();
counter.Add(15, new KeyValuePair<string, object>("key", "value"));
meterProvider.ForceFlush();
Assert.Equal(2, exporterdMetricItems1.Count);
Assert.Equal(2, exporterdMetricItems2.Count);
// Check value exported for Counter
this.AssertLongSumValueForMetric(exporterdMetricItems1[0], 15);
if (aggregationTemporality == AggregationTemporality.Delta)
{
this.AssertLongSumValueForMetric(exporterdMetricItems2[0], 15);
}
else
{
this.AssertLongSumValueForMetric(exporterdMetricItems2[0], 25);
}
// Check value exported for Gauge
this.AssertLongSumValueForMetric(exporterdMetricItems1[1], 300);
this.AssertLongSumValueForMetric(exporterdMetricItems2[1], 400);
}
private void AssertLongSumValueForMetric(Metric metric, long value)
{
using var metricPoints = metric.GetMetricPoints();
var metricPointsEnumerator = metricPoints.GetEnumerator();
Assert.True(metricPointsEnumerator.MoveNext()); // One MetricPoint is emitted for the Metric
ref var metricPointForFirstExport = ref metricPointsEnumerator.Current;
Assert.Equal(value, metricPointForFirstExport.LongValue);
}
}
}