Delta exporters will Export only those points which received new update (#2629)

This commit is contained in:
Cijo Thomas 2021-11-17 11:04:14 -08:00 committed by GitHub
parent 5325185d02
commit ac98506ed4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 198 additions and 53 deletions

View File

@ -2,6 +2,9 @@
## Unreleased
* Metrics SDK will not provide inactive Metrics to delta exporter.
([#2629](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2629))
* Histogram bounds are validated when added to a View.
([#2573](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2573))

View File

@ -37,11 +37,13 @@ namespace OpenTelemetry.Metrics
private readonly AggregationTemporality temporality;
private readonly bool outputDelta;
private readonly MetricPoint[] metricPoints;
private readonly int[] currentMetricPointBatch;
private readonly AggregationType aggType;
private readonly double[] histogramBounds;
private readonly UpdateLongDelegate updateLongCallback;
private readonly UpdateDoubleDelegate updateDoubleCallback;
private int metricPointIndex = 0;
private int batchSize = 0;
private bool zeroTagMetricPointInitialized;
private DateTimeOffset startTimeExclusive;
private DateTimeOffset endTimeInclusive;
@ -53,6 +55,7 @@ namespace OpenTelemetry.Metrics
string[] tagKeysInteresting = null)
{
this.metricPoints = new MetricPoint[MaxMetricPoints];
this.currentMetricPointBatch = new int[MaxMetricPoints];
this.aggType = aggType;
this.temporality = temporality;
this.outputDelta = temporality == AggregationTemporality.Delta ? true : false;
@ -92,10 +95,46 @@ namespace OpenTelemetry.Metrics
this.updateDoubleCallback(value, tags);
}
internal void SnapShot()
internal int SnapShot()
{
this.batchSize = 0;
var indexSnapShot = Math.Min(this.metricPointIndex, MaxMetricPoints - 1);
if (this.temporality == AggregationTemporality.Delta)
{
this.SnapShotDelta(indexSnapShot);
}
else
{
this.SnapShotCumulative(indexSnapShot);
}
this.endTimeInclusive = DateTimeOffset.UtcNow;
return this.batchSize;
}
internal void SnapShotDelta(int indexSnapShot)
{
for (int i = 0; i <= indexSnapShot; i++)
{
ref var metricPoint = ref this.metricPoints[i];
if (metricPoint.MetricPointStatus == MetricPointStatus.NoCollectPending)
{
continue;
}
metricPoint.TakeSnapShot(this.outputDelta);
this.currentMetricPointBatch[this.batchSize] = i;
this.batchSize++;
}
if (this.endTimeInclusive != default)
{
this.startTimeExclusive = this.endTimeInclusive;
}
}
internal void SnapShotCumulative(int indexSnapShot)
{
for (int i = 0; i <= indexSnapShot; i++)
{
ref var metricPoint = ref this.metricPoints[i];
@ -105,24 +144,14 @@ namespace OpenTelemetry.Metrics
}
metricPoint.TakeSnapShot(this.outputDelta);
this.currentMetricPointBatch[this.batchSize] = i;
this.batchSize++;
}
if (this.temporality == AggregationTemporality.Delta)
{
if (this.endTimeInclusive != default)
{
this.startTimeExclusive = this.endTimeInclusive;
}
}
DateTimeOffset dt = DateTimeOffset.UtcNow;
this.endTimeInclusive = dt;
}
internal BatchMetricPoint GetMetricPoints()
{
var indexSnapShot = Math.Min(this.metricPointIndex, MaxMetricPoints - 1);
return new BatchMetricPoint(this.metricPoints, indexSnapShot + 1, this.startTimeExclusive, this.endTimeInclusive);
return new BatchMetricPoint(this.metricPoints, this.currentMetricPointBatch, this.batchSize, this.startTimeExclusive, this.endTimeInclusive);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]

View File

@ -16,7 +16,6 @@
using System;
using System.Collections;
using System.Diagnostics;
using OpenTelemetry.Internal;
namespace OpenTelemetry.Metrics
@ -24,17 +23,18 @@ namespace OpenTelemetry.Metrics
public readonly struct BatchMetricPoint : IDisposable
{
private readonly MetricPoint[] metricsPoints;
private readonly int[] metricPointsToProcess;
private readonly long targetCount;
private readonly DateTimeOffset start;
private readonly DateTimeOffset end;
internal BatchMetricPoint(MetricPoint[] metricsPoints, int maxSize, DateTimeOffset start, DateTimeOffset end)
internal BatchMetricPoint(MetricPoint[] metricsPoints, int[] metricPointsToProcess, long targetCount, DateTimeOffset start, DateTimeOffset end)
{
Debug.Assert(maxSize > 0, $"{nameof(maxSize)} should be a positive number.");
Guard.Null(metricsPoints, nameof(metricsPoints));
this.metricsPoints = metricsPoints;
this.targetCount = maxSize;
this.metricPointsToProcess = metricPointsToProcess;
this.targetCount = targetCount;
this.start = start;
this.end = end;
}
@ -50,7 +50,7 @@ namespace OpenTelemetry.Metrics
/// <returns><see cref="Enumerator"/>.</returns>
public Enumerator GetEnumerator()
{
return new Enumerator(this.metricsPoints, this.targetCount, this.start, this.end);
return new Enumerator(this.metricsPoints, this.metricPointsToProcess, this.targetCount, this.start, this.end);
}
/// <summary>
@ -59,14 +59,16 @@ namespace OpenTelemetry.Metrics
public struct Enumerator : IEnumerator
{
private readonly MetricPoint[] metricsPoints;
private readonly int[] metricPointsToProcess;
private readonly DateTimeOffset start;
private readonly DateTimeOffset end;
private long targetCount;
private long index;
internal Enumerator(MetricPoint[] metricsPoints, long targetCount, DateTimeOffset start, DateTimeOffset end)
internal Enumerator(MetricPoint[] metricsPoints, int[] metricPointsToProcess, long targetCount, DateTimeOffset start, DateTimeOffset end)
{
this.metricsPoints = metricsPoints;
this.metricPointsToProcess = metricPointsToProcess;
this.targetCount = targetCount;
this.index = -1;
this.start = start;
@ -77,7 +79,7 @@ namespace OpenTelemetry.Metrics
{
get
{
return ref this.metricsPoints[this.index];
return ref this.metricsPoints[this.metricPointsToProcess[this.index]];
}
}
@ -93,12 +95,7 @@ namespace OpenTelemetry.Metrics
{
while (++this.index < this.targetCount)
{
ref var metricPoint = ref this.metricsPoints[this.index];
if (metricPoint.StartTime == default)
{
continue;
}
ref var metricPoint = ref this.metricsPoints[this.metricPointsToProcess[this.index]];
metricPoint.StartTime = this.start;
metricPoint.EndTime = this.end;
return true;

View File

@ -443,19 +443,23 @@ namespace OpenTelemetry.Metrics
for (int i = 0; i < target; i++)
{
var metric = this.metrics[i];
int metricPointSize = 0;
if (metric != null)
{
if (metric.InstrumentDisposed)
{
metric.SnapShot();
metricPointSize = metric.SnapShot();
this.metrics[i] = null;
}
else
{
metric.SnapShot();
metricPointSize = metric.SnapShot();
}
this.metricsCurrentBatch[metricCountCurrentBatch++] = metric;
if (metricPointSize > 0)
{
this.metricsCurrentBatch[metricCountCurrentBatch++] = metric;
}
}
}

View File

@ -138,9 +138,9 @@ namespace OpenTelemetry.Metrics
this.aggStore.Update(value, tags);
}
internal void SnapShot()
internal int SnapShot()
{
this.aggStore.SnapShot();
return this.aggStore.SnapShot();
}
}
}

View File

@ -46,6 +46,7 @@ namespace OpenTelemetry.Metrics
this.DoubleValue = default;
this.doubleVal = default;
this.lastDoubleSum = default;
this.MetricPointStatus = MetricPointStatus.NoCollectPending;
if (this.AggType == AggregationType.Histogram)
{
@ -86,6 +87,8 @@ namespace OpenTelemetry.Metrics
public double[] ExplicitBounds { get; internal set; }
internal MetricPointStatus MetricPointStatus { get; private set; }
private readonly AggregationType AggType { get; }
internal void Update(long number)
@ -117,6 +120,19 @@ namespace OpenTelemetry.Metrics
break;
}
}
// There is a race with Snapshot:
// Update() updates the value
// Snapshot snapshots the value
// Snapshot sets status to NoCollectPending
// Update sets status to CollectPending -- this is not right as the Snapshot
// already included the updated value.
// In the absence of any new Update call until next Snapshot,
// this results in exporting an Update even though
// it had no update.
// TODO: For Delta, this can be mitigated
// by ignoring Zero points
this.MetricPointStatus = MetricPointStatus.CollectPending;
}
internal void Update(double number)
@ -180,6 +196,19 @@ namespace OpenTelemetry.Metrics
break;
}
}
// There is a race with Snapshot:
// Update() updates the value
// Snapshot snapshots the value
// Snapshot sets status to NoCollectPending
// Update sets status to CollectPending -- this is not right as the Snapshot
// already included the updated value.
// In the absence of any new Update call until next Snapshot,
// this results in exporting an Update even though
// it had no update.
// TODO: For Delta, this can be mitigated
// by ignoring Zero points
this.MetricPointStatus = MetricPointStatus.CollectPending;
}
internal void TakeSnapShot(bool outputDelta)
@ -194,6 +223,14 @@ namespace OpenTelemetry.Metrics
long initValue = Interlocked.Read(ref this.longVal);
this.LongValue = initValue - this.lastLongSum;
this.lastLongSum = initValue;
this.MetricPointStatus = MetricPointStatus.NoCollectPending;
// Check again if value got updated, if yes reset status.
// This ensures no Updates get Lost.
if (initValue != Interlocked.Read(ref this.longVal))
{
this.MetricPointStatus = MetricPointStatus.CollectPending;
}
}
else
{
@ -216,6 +253,14 @@ namespace OpenTelemetry.Metrics
double initValue = Interlocked.CompareExchange(ref this.doubleVal, 0.0, double.NegativeInfinity);
this.DoubleValue = initValue - this.lastDoubleSum;
this.lastDoubleSum = initValue;
this.MetricPointStatus = MetricPointStatus.NoCollectPending;
// Check again if value got updated, if yes reset status.
// This ensures no Updates get Lost.
if (initValue != Interlocked.CompareExchange(ref this.doubleVal, 0.0, double.NegativeInfinity))
{
this.MetricPointStatus = MetricPointStatus.CollectPending;
}
}
else
{
@ -233,6 +278,15 @@ namespace OpenTelemetry.Metrics
case AggregationType.LongGauge:
{
this.LongValue = Interlocked.Read(ref this.longVal);
this.MetricPointStatus = MetricPointStatus.NoCollectPending;
// Check again if value got updated, if yes reset status.
// This ensures no Updates get Lost.
if (this.LongValue != Interlocked.Read(ref this.longVal))
{
this.MetricPointStatus = MetricPointStatus.CollectPending;
}
break;
}
@ -244,6 +298,15 @@ namespace OpenTelemetry.Metrics
// the exchange (to 0.0) will never occur,
// but we get the original value atomically.
this.DoubleValue = Interlocked.CompareExchange(ref this.doubleVal, 0.0, double.NegativeInfinity);
this.MetricPointStatus = MetricPointStatus.NoCollectPending;
// Check again if value got updated, if yes reset status.
// This ensures no Updates get Lost.
if (this.DoubleValue != Interlocked.CompareExchange(ref this.doubleVal, 0.0, double.NegativeInfinity))
{
this.MetricPointStatus = MetricPointStatus.CollectPending;
}
break;
}
@ -267,6 +330,8 @@ namespace OpenTelemetry.Metrics
this.bucketCounts[i] = 0;
}
}
this.MetricPointStatus = MetricPointStatus.NoCollectPending;
}
break;
@ -283,6 +348,8 @@ namespace OpenTelemetry.Metrics
this.longVal = 0;
this.doubleVal = 0;
}
this.MetricPointStatus = MetricPointStatus.NoCollectPending;
}
break;

View File

@ -0,0 +1,33 @@
// <copyright file="MetricPointStatus.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
namespace OpenTelemetry.Metrics
{
internal enum MetricPointStatus
{
/// <summary>
/// This status is applied to <see cref="MetricPoint"/>s with status <see cref="CollectPending"/> after a Collect.
/// If an update occurs, status will be moved to <see cref="CollectPending"/>.
/// </summary>
NoCollectPending,
/// <summary>
/// The <see cref="MetricPoint"/> has been updated since the previous Collect cycle.
/// Collect will move it to <see cref="NoCollectPending"/>.
/// </summary>
CollectPending,
}
}

View File

@ -180,14 +180,7 @@ namespace OpenTelemetry.Instrumentation.Http.Tests
}
else
{
Assert.Single(requestMetrics);
var metricPoints = new List<MetricPoint>();
foreach (var p in requestMetrics[0].GetMetricPoints())
{
metricPoints.Add(p);
}
Assert.Empty(metricPoints);
Assert.Empty(requestMetrics);
}
}

View File

@ -24,7 +24,7 @@ namespace OpenTelemetry.Metrics.Tests
{
public class MemoryEfficiencyTests
{
[Theory(Skip = "To be run after https://github.com/open-telemetry/opentelemetry-dotnet/issues/2524 is fixed")]
[Theory]
[InlineData(AggregationTemporality.Cumulative)]
[InlineData(AggregationTemporality.Delta)]
public void ExportOnlyWhenPointChanged(AggregationTemporality temporality)
@ -50,7 +50,14 @@ namespace OpenTelemetry.Metrics.Tests
exportedItems.Clear();
meterProvider.ForceFlush();
Assert.Empty(exportedItems);
if (temporality == AggregationTemporality.Cumulative)
{
Assert.Single(exportedItems);
}
else
{
Assert.Empty(exportedItems);
}
}
}
}

View File

@ -85,7 +85,7 @@ namespace OpenTelemetry.Metrics.Tests
meter.CreateObservableGauge<long>("myBadGauge", observeValues: () => throw new Exception("gauge read error"));
meterProvider.ForceFlush(MaxTimeToAllowForFlush);
Assert.Equal(2, exportedItems.Count);
Assert.Single(exportedItems);
var metric = exportedItems[0];
Assert.Equal("myGauge", metric.Name);
List<MetricPoint> metricPoints = new List<MetricPoint>();
@ -99,16 +99,6 @@ namespace OpenTelemetry.Metrics.Tests
Assert.Equal(100, metricPoint.LongValue);
Assert.NotNull(metricPoint.Keys);
Assert.NotNull(metricPoint.Values);
metric = exportedItems[1];
Assert.Equal("myBadGauge", metric.Name);
metricPoints.Clear();
foreach (ref var mp in metric.GetMetricPoints())
{
metricPoints.Add(mp);
}
Assert.Empty(metricPoints);
}
[Theory]
@ -142,6 +132,7 @@ namespace OpenTelemetry.Metrics.Tests
// Metric stream will remain one.
var anotherCounterSameName = meter1.CreateCounter<long>("name1");
anotherCounterSameName.Add(10);
counterLong.Add(10);
metricItems.Clear();
metricReader.Collect();
Assert.Single(metricItems);
@ -151,6 +142,7 @@ namespace OpenTelemetry.Metrics.Tests
// (the Meter name is not part of stream name)
var anotherCounterSameNameDiffMeter = meter2.CreateCounter<long>("name1");
anotherCounterSameNameDiffMeter.Add(10);
counterLong.Add(10);
metricItems.Clear();
metricReader.Collect();
Assert.Single(metricItems);
@ -390,22 +382,30 @@ namespace OpenTelemetry.Metrics.Tests
Assert.Equal(2, metricItems.Count);
metricItems.Clear();
counter1.Add(10, new KeyValuePair<string, object>("key", "value"));
counter2.Add(10, new KeyValuePair<string, object>("key", "value"));
meter1.Dispose();
metricReader.Collect();
Assert.Equal(2, metricItems.Count);
metricItems.Clear();
counter1.Add(10, new KeyValuePair<string, object>("key", "value"));
counter2.Add(10, new KeyValuePair<string, object>("key", "value"));
metricReader.Collect();
Assert.Single(metricItems);
metricItems.Clear();
counter1.Add(10, new KeyValuePair<string, object>("key", "value"));
counter2.Add(10, new KeyValuePair<string, object>("key", "value"));
meter2.Dispose();
metricReader.Collect();
Assert.Single(metricItems);
metricItems.Clear();
counter1.Add(10, new KeyValuePair<string, object>("key", "value"));
counter2.Add(10, new KeyValuePair<string, object>("key", "value"));
metricReader.Collect();
Assert.Empty(metricItems);
}
@ -458,9 +458,21 @@ namespace OpenTelemetry.Metrics.Tests
Assert.Equal(AggregatorStore.MaxMetricPoints, MetricPointCount());
metricItems.Clear();
counterLong.Add(10);
for (int i = 0; i < AggregatorStore.MaxMetricPoints + 1; i++)
{
counterLong.Add(10, new KeyValuePair<string, object>("key", "value" + i));
}
metricReader.Collect();
Assert.Equal(AggregatorStore.MaxMetricPoints, MetricPointCount());
counterLong.Add(10);
for (int i = 0; i < AggregatorStore.MaxMetricPoints + 1; i++)
{
counterLong.Add(10, new KeyValuePair<string, object>("key", "value" + i));
}
// These updates would be dropped.
counterLong.Add(10, new KeyValuePair<string, object>("key", "valueA"));
counterLong.Add(10, new KeyValuePair<string, object>("key", "valueB"));