From 9fed161c3a2ec8b7b2e91b0bb943b869e2e2c434 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Wed, 10 Nov 2021 09:43:13 -0800 Subject: [PATCH] Handle instrument disposal (#2585) --- docs/metrics/customizing-the-sdk/Program.cs | 2 +- src/OpenTelemetry/Metrics/MeterProviderSdk.cs | 52 +++++++++++++++++-- src/OpenTelemetry/Metrics/Metric.cs | 3 ++ .../Metrics/MetricAPITest.cs | 49 +++++++++++++++++ 4 files changed, 100 insertions(+), 6 deletions(-) diff --git a/docs/metrics/customizing-the-sdk/Program.cs b/docs/metrics/customizing-the-sdk/Program.cs index 279898177..7e2e9111e 100644 --- a/docs/metrics/customizing-the-sdk/Program.cs +++ b/docs/metrics/customizing-the-sdk/Program.cs @@ -60,7 +60,7 @@ public class Program // turn off the above default. i.e any // instrument which does not match any views // gets dropped. - // .AddView(instrumentName: "*", new MetricStreamConfiguration() { Aggregation = Aggregation.Drop }) + // .AddView(instrumentName: "*", MetricStreamConfiguration.Drop) .AddConsoleExporter() .Build(); diff --git a/src/OpenTelemetry/Metrics/MeterProviderSdk.cs b/src/OpenTelemetry/Metrics/MeterProviderSdk.cs index d39eec328..25344365e 100644 --- a/src/OpenTelemetry/Metrics/MeterProviderSdk.cs +++ b/src/OpenTelemetry/Metrics/MeterProviderSdk.cs @@ -30,6 +30,7 @@ namespace OpenTelemetry.Metrics internal const int MaxMetrics = 1000; internal int ShutdownCount; private readonly Metric[] metrics; + private readonly Metric[] metricsCurrentBatch; private readonly List instrumentations = new List(); private readonly List> viewConfigs; private readonly object collectLock = new object(); @@ -50,6 +51,7 @@ namespace OpenTelemetry.Metrics this.Resource = resource; this.viewConfigs = viewConfigs; this.metrics = new Metric[MaxMetrics]; + this.metricsCurrentBatch = new Metric[MaxMetrics]; AggregationTemporality temporality = AggregationTemporality.Cumulative; @@ -135,7 +137,7 @@ namespace OpenTelemetry.Metrics // which will apply defaults. // Users can turn off this default // by adding a view like below as the last view. - // .AddView(instrumentName: "*", new MetricStreamConfiguration() { Aggregation = Aggregation.Drop }) + // .AddView(instrumentName: "*", MetricStreamConfiguration.Drop) metricStreamConfigs.Add(null); } @@ -217,6 +219,8 @@ namespace OpenTelemetry.Metrics this.listener.SetMeasurementEventCallback((instrument, value, tags, state) => this.MeasurementRecordedLong(instrument, value, tags, state)); this.listener.SetMeasurementEventCallback((instrument, value, tags, state) => this.MeasurementRecordedLong(instrument, value, tags, state)); this.listener.SetMeasurementEventCallback((instrument, value, tags, state) => this.MeasurementRecordedLong(instrument, value, tags, state)); + + this.listener.MeasurementsCompleted = (instrument, state) => this.MeasurementsCompleted(instrument, state); } else { @@ -282,9 +286,10 @@ namespace OpenTelemetry.Metrics this.listener.SetMeasurementEventCallback((instrument, value, tags, state) => this.MeasurementRecordedLongSingleStream(instrument, value, tags, state)); this.listener.SetMeasurementEventCallback((instrument, value, tags, state) => this.MeasurementRecordedLongSingleStream(instrument, value, tags, state)); this.listener.SetMeasurementEventCallback((instrument, value, tags, state) => this.MeasurementRecordedLongSingleStream(instrument, value, tags, state)); + + this.listener.MeasurementsCompleted = (instrument, state) => this.MeasurementsCompletedSingleStream(instrument, state); } - this.listener.MeasurementsCompleted = (instrument, state) => this.MeasurementsCompleted(instrument, state); this.listener.Start(); static Regex GetWildcardRegex(IEnumerable collection) @@ -300,9 +305,31 @@ namespace OpenTelemetry.Metrics internal MetricReader Reader => this.reader; + internal void MeasurementsCompletedSingleStream(Instrument instrument, object state) + { + var metric = state as Metric; + if (metric == null) + { + // TODO: log + return; + } + + metric.InstrumentDisposed = true; + } + internal void MeasurementsCompleted(Instrument instrument, object state) { - Console.WriteLine($"Instrument {instrument.Meter.Name}:{instrument.Name} completed."); + var metrics = state as List; + if (metrics == null) + { + // TODO: log + return; + } + + foreach (var metric in metrics) + { + metric.InstrumentDisposed = true; + } } internal void MeasurementRecordedDouble(Instrument instrument, double value, ReadOnlySpan> tagsRos, object state) @@ -412,12 +439,27 @@ namespace OpenTelemetry.Metrics var indexSnapShot = Math.Min(this.metricIndex, MaxMetrics - 1); var target = indexSnapShot + 1; + int metricCountCurrentBatch = 0; for (int i = 0; i < target; i++) { - this.metrics[i].SnapShot(); + var metric = this.metrics[i]; + if (metric != null) + { + if (metric.InstrumentDisposed) + { + metric.SnapShot(); + this.metrics[i] = null; + } + else + { + metric.SnapShot(); + } + + this.metricsCurrentBatch[metricCountCurrentBatch++] = metric; + } } - return (target > 0) ? new Batch(this.metrics, target) : default; + return (metricCountCurrentBatch > 0) ? new Batch(this.metricsCurrentBatch, metricCountCurrentBatch) : default; } catch (Exception) { diff --git a/src/OpenTelemetry/Metrics/Metric.cs b/src/OpenTelemetry/Metrics/Metric.cs index 53d65c581..50fd75ba8 100644 --- a/src/OpenTelemetry/Metrics/Metric.cs +++ b/src/OpenTelemetry/Metrics/Metric.cs @@ -106,6 +106,7 @@ namespace OpenTelemetry.Metrics this.aggStore = new AggregatorStore(aggType, temporality, histogramBounds ?? DefaultHistogramBounds, tagKeysInteresting); this.Temporality = temporality; + this.InstrumentDisposed = false; } public MetricType MetricType { get; private set; } @@ -120,6 +121,8 @@ namespace OpenTelemetry.Metrics public Meter Meter { get; private set; } + internal bool InstrumentDisposed { get; set; } + public BatchMetricPoint GetMetricPoints() { return this.aggStore.GetMetricPoints(); diff --git a/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs b/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs index f3279770a..f28df1911 100644 --- a/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs +++ b/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs @@ -361,6 +361,55 @@ namespace OpenTelemetry.Metrics.Tests } } + [Theory] + [InlineData(AggregationTemporality.Cumulative)] + [InlineData(AggregationTemporality.Delta)] + public void TestInstrumentDisposal(AggregationTemporality temporality) + { + var metricItems = new List(); + var metricExporter = new InMemoryExporter(metricItems); + var metricReader = new BaseExportingMetricReader(metricExporter) + { + PreferredAggregationTemporality = temporality, + }; + + var meter1 = new Meter($"{Utils.GetCurrentMethodName()}.{temporality}.1"); + var meter2 = new Meter($"{Utils.GetCurrentMethodName()}.{temporality}.2"); + var counter1 = meter1.CreateCounter("counterFromMeter1"); + var counter2 = meter2.CreateCounter("counterFromMeter2"); + using var meterProvider = Sdk.CreateMeterProviderBuilder() + .AddMeter(meter1.Name) + .AddMeter(meter2.Name) + .AddReader(metricReader) + .Build(); + + counter1.Add(10, new KeyValuePair("key", "value")); + counter2.Add(10, new KeyValuePair("key", "value")); + + metricReader.Collect(); + Assert.Equal(2, metricItems.Count); + metricItems.Clear(); + + meter1.Dispose(); + + metricReader.Collect(); + Assert.Equal(2, metricItems.Count); + metricItems.Clear(); + + metricReader.Collect(); + Assert.Single(metricItems); + metricItems.Clear(); + + meter2.Dispose(); + + metricReader.Collect(); + Assert.Single(metricItems); + metricItems.Clear(); + + metricReader.Collect(); + Assert.Empty(metricItems); + } + [Theory] [InlineData(AggregationTemporality.Cumulative)] [InlineData(AggregationTemporality.Delta)]