Handle exception from observable instruments (#2457)

This commit is contained in:
Cijo Thomas 2021-10-06 13:06:35 -07:00 committed by GitHub
parent 3d16cd8d39
commit dd6b11bafa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 114 additions and 13 deletions

View File

@ -2,6 +2,9 @@
## Unreleased
* Exception from Observable instrument callbacks does not
result in entire metrics being lost.
* SDK is allocation-free on recording of measurements with
upto 8 tags.

View File

@ -55,11 +55,21 @@ namespace OpenTelemetry.Internal
}
[NonEvent]
public void MetricObserverCallbackException(string metricName, Exception ex)
public void MetricObserverCallbackException(Exception exception)
{
if (this.IsEnabled(EventLevel.Warning, EventKeywords.All))
{
this.MetricObserverCallbackError(metricName, ex.ToInvariantString());
if (exception is AggregateException aggregateException)
{
foreach (var ex in aggregateException.InnerExceptions)
{
this.ObservableInstrumentCallbackException(ex.ToInvariantString());
}
}
else
{
this.ObservableInstrumentCallbackException(exception.ToInvariantString());
}
}
}
@ -238,10 +248,10 @@ namespace OpenTelemetry.Internal
this.WriteEvent(15, spanName);
}
[Event(16, Message = "Exception occurring while invoking Metric Observer callback. '{0}' Exception: '{1}'", Level = EventLevel.Warning)]
public void MetricObserverCallbackError(string metricName, string exception)
[Event(16, Message = "Exception occurred while invoking Observable instrument callback. Exception: '{0}'", Level = EventLevel.Warning)]
public void ObservableInstrumentCallbackException(string exception)
{
this.WriteEvent(16, metricName, exception);
this.WriteEvent(16, exception);
}
[Event(17, Message = "Batcher finished collection with '{0}' metrics.", Level = EventLevel.Informational)]

View File

@ -34,16 +34,17 @@ namespace OpenTelemetry.Metrics
private readonly ConcurrentDictionary<string[], ConcurrentDictionary<object[], int>> keyValue2MetricAggs =
new ConcurrentDictionary<string[], ConcurrentDictionary<object[], int>>(new StringArrayEqualityComparer());
private AggregationTemporality temporality;
private MetricPoint[] metrics;
private readonly AggregationTemporality temporality;
private readonly bool outputDelta;
private readonly MetricPoint[] metrics;
private readonly AggregationType aggType;
private readonly double[] histogramBounds;
private readonly UpdateLongDelegate updateLongCallback;
private readonly UpdateDoubleDelegate updateDoubleCallback;
private int metricPointIndex = 0;
private bool zeroTagMetricPointInitialized;
private AggregationType aggType;
private double[] histogramBounds;
private DateTimeOffset startTimeExclusive;
private DateTimeOffset endTimeInclusive;
private UpdateLongDelegate updateLongCallback;
private UpdateDoubleDelegate updateDoubleCallback;
internal AggregatorStore(
AggregationType aggType,
@ -54,6 +55,7 @@ namespace OpenTelemetry.Metrics
this.metrics = new MetricPoint[MaxMetricPoints];
this.aggType = aggType;
this.temporality = temporality;
this.outputDelta = temporality == AggregationTemporality.Delta ? true : false;
this.histogramBounds = histogramBounds;
this.startTimeExclusive = DateTimeOffset.UtcNow;
if (tagKeysInteresting == null)
@ -102,7 +104,7 @@ namespace OpenTelemetry.Metrics
continue;
}
metricPoint.TakeSnapShot(this.temporality == AggregationTemporality.Delta ? true : false);
metricPoint.TakeSnapShot(this.outputDelta);
}
if (this.temporality == AggregationTemporality.Delta)

View File

@ -19,6 +19,7 @@ using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Linq;
using OpenTelemetry.Internal;
using OpenTelemetry.Resources;
namespace OpenTelemetry.Metrics
@ -348,7 +349,18 @@ namespace OpenTelemetry.Metrics
try
{
// Record all observable instruments
this.listener.RecordObservableInstruments();
try
{
this.listener.RecordObservableInstruments();
}
catch (Exception exception)
{
// TODO:
// It doesn't looks like we can find which instrument callback
// threw.
OpenTelemetrySdkEventSource.Log.MetricObserverCallbackException(exception);
}
var indexSnapShot = Math.Min(this.metricIndex, MaxMetrics - 1);
var target = indexSnapShot + 1;
for (int i = 0; i < target; i++)

View File

@ -25,8 +25,10 @@ using Xunit.Abstractions;
namespace OpenTelemetry.Metrics.Tests
{
#pragma warning disable SA1000 // KeywordsMustBeSpacedCorrectly https://github.com/DotNetAnalyzers/StyleCopAnalyzers/issues/3214
public class MetricApiTest
{
private const int MaxTimeToAllowForFlush = 10000;
private static int numberOfThreads = Environment.ProcessorCount;
private static long deltaLongValueUpdatedByEachCall = 10;
private static double deltaDoubleValueUpdatedByEachCall = 11.987;
@ -38,6 +40,77 @@ namespace OpenTelemetry.Metrics.Tests
this.output = output;
}
[Fact]
public void ObserverCallbackTest()
{
using var meter = new Meter("ObserverCallbackErrorTest");
var exportedItems = new List<Metric>();
using var meterProvider = Sdk.CreateMeterProviderBuilder()
.AddMeter(meter.Name)
.AddInMemoryExporter(exportedItems)
.Build();
var measurement = new Measurement<int>(100, new("name", "apple"), new("color", "red"));
meter.CreateObservableGauge("myGauge", () => measurement);
meterProvider.ForceFlush(MaxTimeToAllowForFlush);
Assert.Single(exportedItems);
var metric = exportedItems[0];
Assert.Equal("myGauge", metric.Name);
List<MetricPoint> metricPoints = new List<MetricPoint>();
foreach (ref var mp in metric.GetMetricPoints())
{
metricPoints.Add(mp);
}
Assert.Single(metricPoints);
var metricPoint = metricPoints[0];
Assert.Equal(100, metricPoint.LongValue);
Assert.NotNull(metricPoint.Keys);
Assert.NotNull(metricPoint.Values);
}
[Fact]
public void ObserverCallbackExceptionTest()
{
using var meter = new Meter("ObserverCallbackErrorTest");
var exportedItems = new List<Metric>();
using var meterProvider = Sdk.CreateMeterProviderBuilder()
.AddMeter(meter.Name)
.AddInMemoryExporter(exportedItems)
.Build();
var measurement = new Measurement<int>(100, new("name", "apple"), new("color", "red"));
meter.CreateObservableGauge("myGauge", () => measurement);
meter.CreateObservableGauge<long>("myBadGauge", observeValues: () => throw new Exception("gauge read error"));
meterProvider.ForceFlush(MaxTimeToAllowForFlush);
Assert.Equal(2, exportedItems.Count);
var metric = exportedItems[0];
Assert.Equal("myGauge", metric.Name);
List<MetricPoint> metricPoints = new List<MetricPoint>();
foreach (ref var mp in metric.GetMetricPoints())
{
metricPoints.Add(mp);
}
Assert.Single(metricPoints);
var metricPoint = metricPoints[0];
Assert.Equal(100, metricPoint.LongValue);
Assert.NotNull(metricPoint.Keys);
Assert.NotNull(metricPoint.Values);
metric = exportedItems[1];
Assert.Equal("myBadGauge", metric.Name);
metricPoints.Clear();
foreach (ref var mp in metric.GetMetricPoints())
{
metricPoints.Add(mp);
}
Assert.Empty(metricPoints);
}
[Theory]
[InlineData(AggregationTemporality.Cumulative)]
[InlineData(AggregationTemporality.Delta)]
@ -495,4 +568,5 @@ namespace OpenTelemetry.Metrics.Tests
public T DeltaValueUpdatedByEachCall;
}
}
#pragma warning restore SA1000 // KeywordsMustBeSpacedCorrectly
}