Handle instrument disposal (#2585)

This commit is contained in:
Cijo Thomas 2021-11-10 09:43:13 -08:00 committed by GitHub
parent bb2b743fc8
commit 9fed161c3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 100 additions and 6 deletions

View File

@ -60,7 +60,7 @@ public class Program
// turn off the above default. i.e any // turn off the above default. i.e any
// instrument which does not match any views // instrument which does not match any views
// gets dropped. // gets dropped.
// .AddView(instrumentName: "*", new MetricStreamConfiguration() { Aggregation = Aggregation.Drop }) // .AddView(instrumentName: "*", MetricStreamConfiguration.Drop)
.AddConsoleExporter() .AddConsoleExporter()
.Build(); .Build();

View File

@ -30,6 +30,7 @@ namespace OpenTelemetry.Metrics
internal const int MaxMetrics = 1000; internal const int MaxMetrics = 1000;
internal int ShutdownCount; internal int ShutdownCount;
private readonly Metric[] metrics; private readonly Metric[] metrics;
private readonly Metric[] metricsCurrentBatch;
private readonly List<object> instrumentations = new List<object>(); private readonly List<object> instrumentations = new List<object>();
private readonly List<Func<Instrument, MetricStreamConfiguration>> viewConfigs; private readonly List<Func<Instrument, MetricStreamConfiguration>> viewConfigs;
private readonly object collectLock = new object(); private readonly object collectLock = new object();
@ -50,6 +51,7 @@ namespace OpenTelemetry.Metrics
this.Resource = resource; this.Resource = resource;
this.viewConfigs = viewConfigs; this.viewConfigs = viewConfigs;
this.metrics = new Metric[MaxMetrics]; this.metrics = new Metric[MaxMetrics];
this.metricsCurrentBatch = new Metric[MaxMetrics];
AggregationTemporality temporality = AggregationTemporality.Cumulative; AggregationTemporality temporality = AggregationTemporality.Cumulative;
@ -135,7 +137,7 @@ namespace OpenTelemetry.Metrics
// which will apply defaults. // which will apply defaults.
// Users can turn off this default // Users can turn off this default
// by adding a view like below as the last view. // by adding a view like below as the last view.
// .AddView(instrumentName: "*", new MetricStreamConfiguration() { Aggregation = Aggregation.Drop }) // .AddView(instrumentName: "*", MetricStreamConfiguration.Drop)
metricStreamConfigs.Add(null); metricStreamConfigs.Add(null);
} }
@ -217,6 +219,8 @@ namespace OpenTelemetry.Metrics
this.listener.SetMeasurementEventCallback<int>((instrument, value, tags, state) => this.MeasurementRecordedLong(instrument, value, tags, state)); this.listener.SetMeasurementEventCallback<int>((instrument, value, tags, state) => this.MeasurementRecordedLong(instrument, value, tags, state));
this.listener.SetMeasurementEventCallback<short>((instrument, value, tags, state) => this.MeasurementRecordedLong(instrument, value, tags, state)); this.listener.SetMeasurementEventCallback<short>((instrument, value, tags, state) => this.MeasurementRecordedLong(instrument, value, tags, state));
this.listener.SetMeasurementEventCallback<byte>((instrument, value, tags, state) => this.MeasurementRecordedLong(instrument, value, tags, state)); this.listener.SetMeasurementEventCallback<byte>((instrument, value, tags, state) => this.MeasurementRecordedLong(instrument, value, tags, state));
this.listener.MeasurementsCompleted = (instrument, state) => this.MeasurementsCompleted(instrument, state);
} }
else else
{ {
@ -282,9 +286,10 @@ namespace OpenTelemetry.Metrics
this.listener.SetMeasurementEventCallback<int>((instrument, value, tags, state) => this.MeasurementRecordedLongSingleStream(instrument, value, tags, state)); this.listener.SetMeasurementEventCallback<int>((instrument, value, tags, state) => this.MeasurementRecordedLongSingleStream(instrument, value, tags, state));
this.listener.SetMeasurementEventCallback<short>((instrument, value, tags, state) => this.MeasurementRecordedLongSingleStream(instrument, value, tags, state)); this.listener.SetMeasurementEventCallback<short>((instrument, value, tags, state) => this.MeasurementRecordedLongSingleStream(instrument, value, tags, state));
this.listener.SetMeasurementEventCallback<byte>((instrument, value, tags, state) => this.MeasurementRecordedLongSingleStream(instrument, value, tags, state)); this.listener.SetMeasurementEventCallback<byte>((instrument, value, tags, state) => this.MeasurementRecordedLongSingleStream(instrument, value, tags, state));
this.listener.MeasurementsCompleted = (instrument, state) => this.MeasurementsCompletedSingleStream(instrument, state);
} }
this.listener.MeasurementsCompleted = (instrument, state) => this.MeasurementsCompleted(instrument, state);
this.listener.Start(); this.listener.Start();
static Regex GetWildcardRegex(IEnumerable<string> collection) static Regex GetWildcardRegex(IEnumerable<string> collection)
@ -300,9 +305,31 @@ namespace OpenTelemetry.Metrics
internal MetricReader Reader => this.reader; internal MetricReader Reader => this.reader;
internal void MeasurementsCompletedSingleStream(Instrument instrument, object state)
{
var metric = state as Metric;
if (metric == null)
{
// TODO: log
return;
}
metric.InstrumentDisposed = true;
}
internal void MeasurementsCompleted(Instrument instrument, object state) internal void MeasurementsCompleted(Instrument instrument, object state)
{ {
Console.WriteLine($"Instrument {instrument.Meter.Name}:{instrument.Name} completed."); var metrics = state as List<Metric>;
if (metrics == null)
{
// TODO: log
return;
}
foreach (var metric in metrics)
{
metric.InstrumentDisposed = true;
}
} }
internal void MeasurementRecordedDouble(Instrument instrument, double value, ReadOnlySpan<KeyValuePair<string, object>> tagsRos, object state) internal void MeasurementRecordedDouble(Instrument instrument, double value, ReadOnlySpan<KeyValuePair<string, object>> tagsRos, object state)
@ -412,12 +439,27 @@ namespace OpenTelemetry.Metrics
var indexSnapShot = Math.Min(this.metricIndex, MaxMetrics - 1); var indexSnapShot = Math.Min(this.metricIndex, MaxMetrics - 1);
var target = indexSnapShot + 1; var target = indexSnapShot + 1;
int metricCountCurrentBatch = 0;
for (int i = 0; i < target; i++) for (int i = 0; i < target; i++)
{ {
this.metrics[i].SnapShot(); var metric = this.metrics[i];
if (metric != null)
{
if (metric.InstrumentDisposed)
{
metric.SnapShot();
this.metrics[i] = null;
}
else
{
metric.SnapShot();
}
this.metricsCurrentBatch[metricCountCurrentBatch++] = metric;
}
} }
return (target > 0) ? new Batch<Metric>(this.metrics, target) : default; return (metricCountCurrentBatch > 0) ? new Batch<Metric>(this.metricsCurrentBatch, metricCountCurrentBatch) : default;
} }
catch (Exception) catch (Exception)
{ {

View File

@ -106,6 +106,7 @@ namespace OpenTelemetry.Metrics
this.aggStore = new AggregatorStore(aggType, temporality, histogramBounds ?? DefaultHistogramBounds, tagKeysInteresting); this.aggStore = new AggregatorStore(aggType, temporality, histogramBounds ?? DefaultHistogramBounds, tagKeysInteresting);
this.Temporality = temporality; this.Temporality = temporality;
this.InstrumentDisposed = false;
} }
public MetricType MetricType { get; private set; } public MetricType MetricType { get; private set; }
@ -120,6 +121,8 @@ namespace OpenTelemetry.Metrics
public Meter Meter { get; private set; } public Meter Meter { get; private set; }
internal bool InstrumentDisposed { get; set; }
public BatchMetricPoint GetMetricPoints() public BatchMetricPoint GetMetricPoints()
{ {
return this.aggStore.GetMetricPoints(); return this.aggStore.GetMetricPoints();

View File

@ -361,6 +361,55 @@ namespace OpenTelemetry.Metrics.Tests
} }
} }
[Theory]
[InlineData(AggregationTemporality.Cumulative)]
[InlineData(AggregationTemporality.Delta)]
public void TestInstrumentDisposal(AggregationTemporality temporality)
{
var metricItems = new List<Metric>();
var metricExporter = new InMemoryExporter<Metric>(metricItems);
var metricReader = new BaseExportingMetricReader(metricExporter)
{
PreferredAggregationTemporality = temporality,
};
var meter1 = new Meter($"{Utils.GetCurrentMethodName()}.{temporality}.1");
var meter2 = new Meter($"{Utils.GetCurrentMethodName()}.{temporality}.2");
var counter1 = meter1.CreateCounter<long>("counterFromMeter1");
var counter2 = meter2.CreateCounter<long>("counterFromMeter2");
using var meterProvider = Sdk.CreateMeterProviderBuilder()
.AddMeter(meter1.Name)
.AddMeter(meter2.Name)
.AddReader(metricReader)
.Build();
counter1.Add(10, new KeyValuePair<string, object>("key", "value"));
counter2.Add(10, new KeyValuePair<string, object>("key", "value"));
metricReader.Collect();
Assert.Equal(2, metricItems.Count);
metricItems.Clear();
meter1.Dispose();
metricReader.Collect();
Assert.Equal(2, metricItems.Count);
metricItems.Clear();
metricReader.Collect();
Assert.Single(metricItems);
metricItems.Clear();
meter2.Dispose();
metricReader.Collect();
Assert.Single(metricItems);
metricItems.Clear();
metricReader.Collect();
Assert.Empty(metricItems);
}
[Theory] [Theory]
[InlineData(AggregationTemporality.Cumulative)] [InlineData(AggregationTemporality.Cumulative)]
[InlineData(AggregationTemporality.Delta)] [InlineData(AggregationTemporality.Delta)]