When collecting measures, do not emit "empty" metrics (i.e. Int64SummaryData where count=0). (#1691)

* Fixes #1666.

# Changes
When collecting measures, do not emit "empty" metrics (i.e. Int64SummaryData where count=0).

* Fix CR/LF warnings

* Add HasData() to Aggregator class

* Add HasData to Aggregator class

* Refactor Method name to be clearer

* Refactor Method name to be clearer

* Update PublicAPI.*.txt files

* Fix flaky test due to thread-safety issues.

* Fix Lint issues

* Fix Unit Test

* Fix Unit Test

Co-authored-by: Cijo Thomas <cithomas@microsoft.com>
This commit is contained in:
Victor Lu 2021-01-27 19:33:37 -08:00 committed by GitHub
parent 92948e60f9
commit 3c3a7c7460
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 164 additions and 32 deletions

View File

@ -176,7 +176,6 @@ OpenTelemetry.Trace.TracerProviderExtensions
abstract OpenTelemetry.BaseExporter<T>.Export(in OpenTelemetry.Batch<T> batch) -> OpenTelemetry.ExportResult
abstract OpenTelemetry.Metrics.Aggregators.Aggregator<T>.GetAggregationType() -> OpenTelemetry.Metrics.Export.AggregationType
abstract OpenTelemetry.Metrics.Aggregators.Aggregator<T>.ToMetricData() -> OpenTelemetry.Metrics.Export.MetricData
abstract OpenTelemetry.Metrics.Aggregators.Aggregator<T>.Update(T value) -> void
abstract OpenTelemetry.Metrics.BoundCounterMetricSdkBase<T>.GetAggregator() -> OpenTelemetry.Metrics.Aggregators.Aggregator<T>
abstract OpenTelemetry.Metrics.Export.MetricExporter.ExportAsync(System.Collections.Generic.IEnumerable<OpenTelemetry.Metrics.Export.Metric> metrics, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task<OpenTelemetry.Metrics.Export.MetricExporter.ExportResult>
abstract OpenTelemetry.Metrics.Export.MetricProcessor.FinishCollectionCycle(out System.Collections.Generic.IEnumerable<OpenTelemetry.Metrics.Export.Metric> metrics) -> void
@ -261,3 +260,5 @@ virtual OpenTelemetry.BaseProcessor<T>.OnForceFlush(int timeoutMilliseconds) ->
virtual OpenTelemetry.BaseProcessor<T>.OnShutdown(int timeoutMilliseconds) -> bool
virtual OpenTelemetry.BaseProcessor<T>.OnStart(T data) -> void
virtual OpenTelemetry.Metrics.Aggregators.Aggregator<T>.Checkpoint() -> void
virtual OpenTelemetry.Metrics.Aggregators.Aggregator<T>.HasCheckpointData() -> bool
virtual OpenTelemetry.Metrics.Aggregators.Aggregator<T>.Update(T value) -> void

View File

@ -176,7 +176,6 @@ OpenTelemetry.Trace.TracerProviderExtensions
abstract OpenTelemetry.BaseExporter<T>.Export(in OpenTelemetry.Batch<T> batch) -> OpenTelemetry.ExportResult
abstract OpenTelemetry.Metrics.Aggregators.Aggregator<T>.GetAggregationType() -> OpenTelemetry.Metrics.Export.AggregationType
abstract OpenTelemetry.Metrics.Aggregators.Aggregator<T>.ToMetricData() -> OpenTelemetry.Metrics.Export.MetricData
abstract OpenTelemetry.Metrics.Aggregators.Aggregator<T>.Update(T value) -> void
abstract OpenTelemetry.Metrics.BoundCounterMetricSdkBase<T>.GetAggregator() -> OpenTelemetry.Metrics.Aggregators.Aggregator<T>
abstract OpenTelemetry.Metrics.Export.MetricExporter.ExportAsync(System.Collections.Generic.IEnumerable<OpenTelemetry.Metrics.Export.Metric> metrics, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task<OpenTelemetry.Metrics.Export.MetricExporter.ExportResult>
abstract OpenTelemetry.Metrics.Export.MetricProcessor.FinishCollectionCycle(out System.Collections.Generic.IEnumerable<OpenTelemetry.Metrics.Export.Metric> metrics) -> void
@ -261,3 +260,5 @@ virtual OpenTelemetry.BaseProcessor<T>.OnForceFlush(int timeoutMilliseconds) ->
virtual OpenTelemetry.BaseProcessor<T>.OnShutdown(int timeoutMilliseconds) -> bool
virtual OpenTelemetry.BaseProcessor<T>.OnStart(T data) -> void
virtual OpenTelemetry.Metrics.Aggregators.Aggregator<T>.Checkpoint() -> void
virtual OpenTelemetry.Metrics.Aggregators.Aggregator<T>.HasCheckpointData() -> bool
virtual OpenTelemetry.Metrics.Aggregators.Aggregator<T>.Update(T value) -> void

View File

@ -199,7 +199,6 @@ OpenTelemetry.Trace.TracerProviderExtensions
abstract OpenTelemetry.BaseExporter<T>.Export(in OpenTelemetry.Batch<T> batch) -> OpenTelemetry.ExportResult
abstract OpenTelemetry.Metrics.Aggregators.Aggregator<T>.GetAggregationType() -> OpenTelemetry.Metrics.Export.AggregationType
abstract OpenTelemetry.Metrics.Aggregators.Aggregator<T>.ToMetricData() -> OpenTelemetry.Metrics.Export.MetricData
abstract OpenTelemetry.Metrics.Aggregators.Aggregator<T>.Update(T value) -> void
abstract OpenTelemetry.Metrics.BoundCounterMetricSdkBase<T>.GetAggregator() -> OpenTelemetry.Metrics.Aggregators.Aggregator<T>
abstract OpenTelemetry.Metrics.Export.MetricExporter.ExportAsync(System.Collections.Generic.IEnumerable<OpenTelemetry.Metrics.Export.Metric> metrics, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task<OpenTelemetry.Metrics.Export.MetricExporter.ExportResult>
abstract OpenTelemetry.Metrics.Export.MetricProcessor.FinishCollectionCycle(out System.Collections.Generic.IEnumerable<OpenTelemetry.Metrics.Export.Metric> metrics) -> void
@ -286,3 +285,5 @@ virtual OpenTelemetry.BaseProcessor<T>.OnShutdown(int timeoutMilliseconds) -> bo
virtual OpenTelemetry.BaseProcessor<T>.OnStart(T data) -> void
virtual OpenTelemetry.Logs.OpenTelemetryLoggerProvider.Dispose(bool disposing) -> void
virtual OpenTelemetry.Metrics.Aggregators.Aggregator<T>.Checkpoint() -> void
virtual OpenTelemetry.Metrics.Aggregators.Aggregator<T>.HasCheckpointData() -> bool
virtual OpenTelemetry.Metrics.Aggregators.Aggregator<T>.Update(T value) -> void

View File

@ -199,7 +199,6 @@ OpenTelemetry.Trace.TracerProviderExtensions
abstract OpenTelemetry.BaseExporter<T>.Export(in OpenTelemetry.Batch<T> batch) -> OpenTelemetry.ExportResult
abstract OpenTelemetry.Metrics.Aggregators.Aggregator<T>.GetAggregationType() -> OpenTelemetry.Metrics.Export.AggregationType
abstract OpenTelemetry.Metrics.Aggregators.Aggregator<T>.ToMetricData() -> OpenTelemetry.Metrics.Export.MetricData
abstract OpenTelemetry.Metrics.Aggregators.Aggregator<T>.Update(T value) -> void
abstract OpenTelemetry.Metrics.BoundCounterMetricSdkBase<T>.GetAggregator() -> OpenTelemetry.Metrics.Aggregators.Aggregator<T>
abstract OpenTelemetry.Metrics.Export.MetricExporter.ExportAsync(System.Collections.Generic.IEnumerable<OpenTelemetry.Metrics.Export.Metric> metrics, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task<OpenTelemetry.Metrics.Export.MetricExporter.ExportResult>
abstract OpenTelemetry.Metrics.Export.MetricProcessor.FinishCollectionCycle(out System.Collections.Generic.IEnumerable<OpenTelemetry.Metrics.Export.Metric> metrics) -> void
@ -286,3 +285,5 @@ virtual OpenTelemetry.BaseProcessor<T>.OnShutdown(int timeoutMilliseconds) -> bo
virtual OpenTelemetry.BaseProcessor<T>.OnStart(T data) -> void
virtual OpenTelemetry.Logs.OpenTelemetryLoggerProvider.Dispose(bool disposing) -> void
virtual OpenTelemetry.Metrics.Aggregators.Aggregator<T>.Checkpoint() -> void
virtual OpenTelemetry.Metrics.Aggregators.Aggregator<T>.HasCheckpointData() -> bool
virtual OpenTelemetry.Metrics.Aggregators.Aggregator<T>.Update(T value) -> void

View File

@ -28,19 +28,23 @@ namespace OpenTelemetry.Metrics.Aggregators
public abstract class Aggregator<T>
where T : struct
{
private long startTimeTicks;
private long checkpointStartTimeTicks;
private AggState active;
private AggState checkpoint;
protected Aggregator()
{
this.startTimeTicks = DateTimeOffset.UtcNow.Ticks;
this.checkpoint = new AggState();
this.active = new AggState();
}
/// <summary>
/// Adds value to the running total in a thread safe manner.
/// </summary>
/// <param name="value">Value to be aggregated.</param>
public abstract void Update(T value);
public virtual void Update(T value)
{
this.active.Increment();
}
/// <summary>
/// Checkpoints the current aggregate data, and resets the state.
@ -48,11 +52,20 @@ namespace OpenTelemetry.Metrics.Aggregators
public virtual void Checkpoint()
{
// checkpoints the start time for the current aggregation, and sets the new start time.
this.checkpointStartTimeTicks = Interlocked.Exchange(ref this.startTimeTicks, DateTimeOffset.UtcNow.Ticks);
this.checkpoint = Interlocked.Exchange(ref this.active, new AggState());
}
/// <summary>
/// Convert Aggregator data to MetricData.
/// Check if checkpoint has any aggregated data.
/// </summary>
/// <returns>true if data was presented to aggregator.</returns>
public virtual bool HasCheckpointData()
{
return this.checkpoint.Count > 0;
}
/// <summary>
/// Convert checkpoint aggregator data to MetricData.
/// </summary>
/// <returns>An instance of <see cref="MetricData"/> representing the currently aggregated value.</returns>
public abstract MetricData ToMetricData();
@ -69,7 +82,7 @@ namespace OpenTelemetry.Metrics.Aggregators
/// <returns>The end timestamp of the last aggregated checkpoint.</returns>
protected DateTimeOffset GetLastEndTimestamp()
{
return new DateTimeOffset(this.startTimeTicks, TimeSpan.Zero).Subtract(TimeSpan.FromTicks(1));
return new DateTimeOffset(this.active.StartTimeTicks, TimeSpan.Zero).Subtract(TimeSpan.FromTicks(1));
}
/// <summary>
@ -78,7 +91,26 @@ namespace OpenTelemetry.Metrics.Aggregators
/// <returns>The start timestamp of the last aggregated checkpoint.</returns>
protected DateTimeOffset GetLastStartTimestamp()
{
return new DateTimeOffset(this.checkpointStartTimeTicks, TimeSpan.Zero);
return new DateTimeOffset(this.checkpoint.StartTimeTicks, TimeSpan.Zero);
}
private class AggState
{
private long count = 0;
public AggState()
{
this.StartTimeTicks = DateTimeOffset.UtcNow.Ticks;
}
public long Count { get => this.count; }
public long StartTimeTicks { get; }
public void Increment()
{
Interlocked.Increment(ref this.count);
}
}
}
}

View File

@ -57,6 +57,8 @@ namespace OpenTelemetry.Metrics.Aggregators
/// <inheritdoc/>
public override void Update(double value)
{
base.Update(value);
// Adds value to the running total in a thread safe manner.
double initialTotal, computedTotal;
do

View File

@ -56,6 +56,7 @@ namespace OpenTelemetry.Metrics.Aggregators
/// <inheritdoc/>
public override void Update(double newValue)
{
base.Update(newValue);
Interlocked.Exchange(ref this.value, newValue);
}
}

View File

@ -62,6 +62,7 @@ namespace OpenTelemetry.Metrics.Aggregators
{
lock (this.updateLock)
{
base.Update(value);
this.summary.Count++;
this.summary.Sum += value;
this.summary.Max = Math.Max(this.summary.Max, value);

View File

@ -57,6 +57,8 @@ namespace OpenTelemetry.Metrics.Aggregators
/// <inheritdoc/>
public override void Update(long value)
{
base.Update(value);
// Adds value to the running total in a thread safe manner.
Interlocked.Add(ref this.sum, value);
}

View File

@ -56,6 +56,7 @@ namespace OpenTelemetry.Metrics.Aggregators
/// <inheritdoc/>
public override void Update(long newValue)
{
base.Update(newValue);
Interlocked.Exchange(ref this.value, newValue);
}
}

View File

@ -62,6 +62,7 @@ namespace OpenTelemetry.Metrics.Aggregators
{
lock (this.updateLock)
{
base.Update(value);
this.summary.Count++;
this.summary.Sum += value;
this.summary.Max = Math.Max(this.summary.Max, value);

View File

@ -64,9 +64,12 @@ namespace OpenTelemetry.Metrics
var labelSet = handle.Key;
var aggregator = handle.Value.GetAggregator();
aggregator.Checkpoint();
var metricData = aggregator.ToMetricData();
metricData.Labels = labelSet.Labels;
metric.Data.Add(metricData);
if (aggregator.HasCheckpointData())
{
var metricData = aggregator.ToMetricData();
metricData.Labels = labelSet.Labels;
metric.Data.Add(metricData);
}
// Updates so far are pushed to Processor/Exporter.
// Adjust status accordinly.
@ -111,9 +114,12 @@ namespace OpenTelemetry.Metrics
var labelSet = handle.Key;
var aggregator = handle.Value.GetAggregator();
aggregator.Checkpoint();
var metricData = aggregator.ToMetricData();
metricData.Labels = labelSet.Labels;
metric.Data.Add(metricData);
if (aggregator.HasCheckpointData())
{
var metricData = aggregator.ToMetricData();
metricData.Labels = labelSet.Labels;
metric.Data.Add(metricData);
}
// Updates so far are pushed to Processor/Exporter.
// Adjust status accordinly.
@ -158,9 +164,12 @@ namespace OpenTelemetry.Metrics
var labelSet = handle.Key;
var aggregator = handle.Value.GetAggregator();
aggregator.Checkpoint();
var metricData = aggregator.ToMetricData();
metricData.Labels = labelSet.Labels;
metric.Data.Add(metricData);
if (aggregator.HasCheckpointData())
{
var metricData = aggregator.ToMetricData();
metricData.Labels = labelSet.Labels;
metric.Data.Add(metricData);
}
}
this.metricProcessor.Process(metric);
@ -176,9 +185,12 @@ namespace OpenTelemetry.Metrics
var labelSet = handle.Key;
var aggregator = handle.Value.GetAggregator();
aggregator.Checkpoint();
var metricData = aggregator.ToMetricData();
metricData.Labels = labelSet.Labels;
metric.Data.Add(metricData);
if (aggregator.HasCheckpointData())
{
var metricData = aggregator.ToMetricData();
metricData.Labels = labelSet.Labels;
metric.Data.Add(metricData);
}
}
this.metricProcessor.Process(metric);
@ -204,9 +216,12 @@ namespace OpenTelemetry.Metrics
var labelSet = handle.Key;
var aggregator = handle.Value.GetAggregator();
aggregator.Checkpoint();
var metricData = aggregator.ToMetricData();
metricData.Labels = labelSet.Labels;
metric.Data.Add(metricData);
if (aggregator.HasCheckpointData())
{
var metricData = aggregator.ToMetricData();
metricData.Labels = labelSet.Labels;
metric.Data.Add(metricData);
}
}
this.metricProcessor.Process(metric);
@ -232,9 +247,12 @@ namespace OpenTelemetry.Metrics
var labelSet = handle.Key;
var aggregator = handle.Value.GetAggregator();
aggregator.Checkpoint();
var metricData = aggregator.ToMetricData();
metricData.Labels = labelSet.Labels;
metric.Data.Add(metricData);
if (aggregator.HasCheckpointData())
{
var metricData = aggregator.ToMetricData();
metricData.Labels = labelSet.Labels;
metric.Data.Add(metricData);
}
}
this.metricProcessor.Process(metric);

View File

@ -14,6 +14,7 @@
// limitations under the License.
// </copyright>
using System;
using System.Collections.Generic;
using System.Linq;
using OpenTelemetry.Metrics.Export;
@ -183,6 +184,70 @@ namespace OpenTelemetry.Metrics.Tests
Assert.Equal(300.5, metricLong.Sum);
}
[Fact]
public void ExportNonEmptyLongMeasure()
{
var testProcessor = new TestMetricProcessor();
var testExporter = new TestMetricExporter(null);
var meter = Sdk.CreateMeterProviderBuilder()
.SetProcessor(testProcessor)
.SetExporter(testExporter)
.SetPushInterval(TimeSpan.FromMilliseconds(100))
.Build()
.GetMeter("library1") as MeterSdk;
var testMeasure = meter.CreateInt64Measure("testMeasure");
var context = default(SpanContext);
// These 2 measures should be summed into 1 record.
testMeasure.Record(context, 100, LabelSet.BlankLabelSet);
testMeasure.Record(context, 200, LabelSet.BlankLabelSet);
// Let collect/export run multiple time. Including when no measures are recorded.
System.Threading.Thread.Sleep(1000);
int recordCount = testExporter.Metrics.Count;
int dataCount = testExporter.Metrics.Select(k => k.Data.Count).Sum();
long measureCount = testExporter.Metrics.Select(k => k.Data.Select(j => ((Int64SummaryData)j).Count).Sum()).Sum();
Assert.True(recordCount >= 1);
Assert.Equal(1, dataCount);
Assert.Equal(2, measureCount);
}
[Fact]
public void ExportNonEmptyDoubleMeasure()
{
var testProcessor = new TestMetricProcessor();
var testExporter = new TestMetricExporter(null);
var meter = Sdk.CreateMeterProviderBuilder()
.SetProcessor(testProcessor)
.SetExporter(testExporter)
.SetPushInterval(TimeSpan.FromMilliseconds(100))
.Build()
.GetMeter("library1") as MeterSdk;
var testMeasure = meter.CreateDoubleMeasure("testMeasure");
var context = default(SpanContext);
// These 2 measures should be summed into 1 record.
testMeasure.Record(context, 100.1, LabelSet.BlankLabelSet);
testMeasure.Record(context, 200.2, LabelSet.BlankLabelSet);
// Let collect/export run multiple time. Including when no measures are recorded.
System.Threading.Thread.Sleep(1000);
int recordCount = testExporter.Metrics.Count;
int dataCount = testExporter.Metrics.Select(k => k.Data.Count).Sum();
long measureCount = testExporter.Metrics.Select(k => k.Data.Select(j => ((DoubleSummaryData)j).Count).Sum()).Sum();
Assert.True(recordCount >= 1);
Assert.Equal(1, dataCount);
Assert.Equal(2, measureCount);
}
private void TestCallbackLong(Int64ObserverMetric observerMetric)
{
var labels1 = new List<KeyValuePair<string, string>>();

View File

@ -15,6 +15,7 @@
// </copyright>
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
@ -24,7 +25,7 @@ namespace OpenTelemetry.Metrics.Tests
{
internal class TestMetricExporter : MetricExporter
{
public List<Metric> Metrics = new List<Metric>();
public ConcurrentQueue<Metric> Metrics = new ConcurrentQueue<Metric>();
private readonly Action onExport;
public TestMetricExporter(Action onExport)
@ -35,7 +36,11 @@ namespace OpenTelemetry.Metrics.Tests
public override Task<ExportResult> ExportAsync(IEnumerable<Metric> metrics, CancellationToken cancellationToken)
{
this.onExport?.Invoke();
this.Metrics.AddRange(metrics);
foreach (var metric in metrics)
{
this.Metrics.Enqueue(metric);
}
return Task.FromResult(ExportResult.Success);
}
}