Refactor MetricReader (#2385)

This commit is contained in:
Reiley Yang 2021-09-20 11:50:56 -07:00 committed by GitHub
parent 58054d16cc
commit 1d63b31bf5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 74 additions and 109 deletions

View File

@ -27,15 +27,9 @@ internal class MyReader : MetricReader
this.name = name;
}
protected override bool OnCollect(Batch<Metric> metrics, int timeoutMilliseconds)
protected override bool ProcessMetrics(Batch<Metric> metrics, int timeoutMilliseconds)
{
Console.WriteLine($"{this.name}.OnCollect(metrics={metrics}, timeoutMilliseconds={timeoutMilliseconds})");
return true;
}
protected override bool OnForceFlush(int timeoutMilliseconds)
{
Console.WriteLine($"{this.name}.OnForceFlush(timeoutMilliseconds={timeoutMilliseconds})");
Console.WriteLine($"{this.name}.ProcessMetrics(metrics={metrics}, timeoutMilliseconds={timeoutMilliseconds})");
return true;
}

View File

@ -54,18 +54,11 @@ namespace OpenTelemetry.Metrics
}
/// <inheritdoc/>
protected override bool OnCollect(Batch<Metric> metrics, int timeoutMilliseconds)
protected override bool ProcessMetrics(Batch<Metric> metrics, int timeoutMilliseconds)
{
return this.exporter.Export(metrics) == ExportResult.Success;
}
/// <inheritdoc/>
protected override bool OnForceFlush(int timeoutMilliseconds)
{
// TODO: need to hammer this out
return true;
}
/// <inheritdoc />
protected override bool OnShutdown(int timeoutMilliseconds)
{

View File

@ -69,10 +69,23 @@ namespace OpenTelemetry.Metrics
}
/// <inheritdoc/>
public override bool Collect(int timeoutMilliseconds = Timeout.Infinite)
protected override bool ProcessMetrics(Batch<Metric> metrics, int timeoutMilliseconds)
{
var cur = this.head;
// CompositeMetricReader delegates the work to its underlying readers,
// so CompositeMetricReader.ProcessMetrics should never be called.
throw new NotImplementedException();
}
/// <inheritdoc/>
protected override bool OnCollect(int timeoutMilliseconds = Timeout.Infinite)
{
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative.");
}
var result = true;
var cur = this.head;
var sw = Stopwatch.StartNew();
while (cur != null)
@ -95,48 +108,6 @@ namespace OpenTelemetry.Metrics
return result;
}
protected override bool OnCollect(Batch<Metric> metrics, int timeoutMilliseconds)
{
// CompositeMetricReader delegates the work to its underlying readers,
// so CompositeMetricReader.OnCollect should never be called.
throw new NotImplementedException();
}
/// <inheritdoc/>
protected override bool OnForceFlush(int timeoutMilliseconds)
{
var cur = this.head;
var sw = Stopwatch.StartNew();
while (cur != null)
{
if (timeoutMilliseconds == Timeout.Infinite)
{
_ = cur.Value.ForceFlush(Timeout.Infinite);
}
else
{
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;
if (timeout <= 0)
{
return false;
}
var succeeded = cur.Value.ForceFlush((int)timeout);
if (!succeeded)
{
return false;
}
}
cur = cur.Next;
}
return true;
}
/// <inheritdoc/>
protected override bool OnShutdown(int timeoutMilliseconds)
{

View File

@ -200,7 +200,7 @@ namespace OpenTelemetry.Metrics
/// </remarks>
internal bool OnForceFlush(int timeoutMilliseconds)
{
return this.reader?.ForceFlush(timeoutMilliseconds) ?? true;
return this.reader?.Collect(timeoutMilliseconds) ?? true;
}
/// <summary>

View File

@ -49,42 +49,16 @@ namespace OpenTelemetry.Metrics
}
}
public virtual bool Collect(int timeoutMilliseconds = Timeout.Infinite)
{
var sw = Stopwatch.StartNew();
var collectMetric = this.ParentProvider.GetMetricCollect();
var metricsCollected = collectMetric();
if (timeoutMilliseconds == Timeout.Infinite)
{
this.OnCollect(metricsCollected, Timeout.Infinite);
}
else
{
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;
if (timeout <= 0)
{
return false;
}
return this.OnCollect(metricsCollected, (int)timeout);
}
return true;
}
/// <summary>
/// Flushes the processor, blocks the current thread until flush
/// completed, shutdown signaled or timed out.
/// Attempts to collect the metrics, blocks the current thread until
/// metrics collection completed or timed out.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when flush succeeded; otherwise, <c>false</c>.
/// Returns <c>true</c> when metrics collection succeeded; otherwise, <c>false</c>.
/// </returns>
/// <exception cref="System.ArgumentOutOfRangeException">
/// Thrown when the <c>timeoutMilliseconds</c> is smaller than -1.
@ -92,7 +66,7 @@ namespace OpenTelemetry.Metrics
/// <remarks>
/// This function guarantees thread-safety.
/// </remarks>
public bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite)
public bool Collect(int timeoutMilliseconds = Timeout.Infinite)
{
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
@ -101,11 +75,11 @@ namespace OpenTelemetry.Metrics
try
{
return this.OnForceFlush(timeoutMilliseconds);
return this.OnCollect(timeoutMilliseconds);
}
catch (Exception)
{
// TODO: OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.ForceFlush), ex);
// TODO: OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.Collect), ex);
return false;
}
}
@ -115,8 +89,8 @@ namespace OpenTelemetry.Metrics
/// shutdown completed or timed out.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when shutdown succeeded; otherwise, <c>false</c>.
@ -163,27 +137,60 @@ namespace OpenTelemetry.Metrics
this.ParentProvider = parentProvider;
}
protected abstract bool OnCollect(Batch<Metric> metrics, int timeoutMilliseconds);
/// <summary>
/// Called by <c>ForceFlush</c>. This function should block the current
/// thread until flush completed, shutdown signaled or timed out.
/// Processes a batch of metrics.
/// </summary>
/// <param name="metrics">Batch of metrics to be processed.</param>
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when flush succeeded; otherwise, <c>false</c>.
/// Returns <c>true</c> when metrics processing succeeded; otherwise,
/// <c>false</c>.
/// </returns>
protected abstract bool ProcessMetrics(Batch<Metric> metrics, int timeoutMilliseconds);
/// <summary>
/// Called by <c>Collect</c>. This function should block the current
/// thread until metrics collection completed, shutdown signaled or
/// timed out.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when metrics collection succeeded; otherwise,
/// <c>false</c>.
/// </returns>
/// <remarks>
/// This function is called synchronously on the thread which called
/// <c>ForceFlush</c>. This function should be thread-safe, and should
/// <c>Collect</c>. This function should be thread-safe, and should
/// not throw exceptions.
/// </remarks>
protected virtual bool OnForceFlush(int timeoutMilliseconds)
protected virtual bool OnCollect(int timeoutMilliseconds)
{
return true;
var sw = Stopwatch.StartNew();
var collectMetric = this.ParentProvider.GetMetricCollect();
var metrics = collectMetric();
if (timeoutMilliseconds == Timeout.Infinite)
{
return this.ProcessMetrics(metrics, Timeout.Infinite);
}
else
{
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;
if (timeout <= 0)
{
return false;
}
return this.ProcessMetrics(metrics, (int)timeout);
}
}
/// <summary>
@ -191,8 +198,8 @@ namespace OpenTelemetry.Metrics
/// thread until shutdown completed or timed out.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when shutdown succeeded; otherwise, <c>false</c>.