Refactor exporter - step 6 (#1094)

* implement BatchExportActivityProcessor

* fix typo

* wrap comments

* no need to stop a Stopwatch

* fix nit

* add thread name

* adopt zero-alloc enumerator

* avoid calling exporter with zero item

* better naming

* clean up

* fix the missing exportTrigger reset

* shutdown drain till sentry

* simplify the flow

* simplify the code

* periodic polling to avoid dead lock
This commit is contained in:
Reiley Yang 2020-08-18 20:58:00 -07:00 committed by GitHub
parent a30634cf0a
commit bf694a0641
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 152 additions and 34 deletions

View File

@ -15,17 +15,13 @@
// </copyright>
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry;
using OpenTelemetry.Trace;
internal class MyExporter : ActivityExporter
internal class MyExporter : ActivityExporterSync
{
public override Task<ExportResult> ExportAsync(
IEnumerable<Activity> batch, CancellationToken cancellationToken)
public override ExportResultSync Export(in Batch<Activity> batch)
{
// Exporter code which can generate further
// telemetry should do so inside SuppressInstrumentation
@ -38,17 +34,6 @@ internal class MyExporter : ActivityExporter
Console.WriteLine($"{activity.DisplayName}");
}
return Task.FromResult(ExportResult.Success);
}
public override Task ShutdownAsync(CancellationToken cancellationToken)
{
Console.WriteLine($"MyExporter.ShutdownAsync");
return Task.CompletedTask;
}
protected override void Dispose(bool disposing)
{
Console.WriteLine($"MyExporter.Dispose");
return ExportResultSync.Success;
}
}

View File

@ -26,6 +26,6 @@ internal static class MyExporterHelperExtensions
throw new ArgumentNullException(nameof(builder));
}
return builder.AddProcessor(new SimpleActivityProcessor(new MyExporter()));
return builder.AddProcessor(new BatchExportActivityProcessor(new MyExporter()));
}
}

View File

@ -28,10 +28,15 @@ namespace OpenTelemetry.Trace
public class BatchExportActivityProcessor : ActivityProcessor
{
private readonly ActivityExporterSync exporter;
private readonly CircularBuffer<Activity> queue;
private readonly TimeSpan scheduledDelay;
private readonly TimeSpan exporterTimeout;
private readonly CircularBuffer<Activity> circularBuffer;
private readonly int scheduledDelayMillis;
private readonly int exporterTimeoutMillis;
private readonly int maxExportBatchSize;
private readonly Thread exporterThread;
private readonly AutoResetEvent exportTrigger = new AutoResetEvent(false);
private readonly ManualResetEvent dataExportedNotification = new ManualResetEvent(false);
private readonly ManualResetEvent shutdownTrigger = new ManualResetEvent(false);
private long shutdownDrainTarget = long.MaxValue;
private bool disposed;
private long droppedCount = 0;
@ -71,10 +76,16 @@ namespace OpenTelemetry.Trace
}
this.exporter = exporter ?? throw new ArgumentNullException(nameof(exporter));
this.queue = new CircularBuffer<Activity>(maxQueueSize);
this.scheduledDelay = TimeSpan.FromMilliseconds(scheduledDelayMillis);
this.exporterTimeout = TimeSpan.FromMilliseconds(exporterTimeoutMillis);
this.circularBuffer = new CircularBuffer<Activity>(maxQueueSize);
this.scheduledDelayMillis = scheduledDelayMillis;
this.exporterTimeoutMillis = exporterTimeoutMillis;
this.maxExportBatchSize = maxExportBatchSize;
this.exporterThread = new Thread(new ThreadStart(this.ExporterProc))
{
IsBackground = true,
Name = $"OpenTelemetry-{nameof(BatchExportActivityProcessor)}-{exporter.GetType().Name}",
};
this.exporterThread.Start();
}
/// <summary>
@ -95,7 +106,7 @@ namespace OpenTelemetry.Trace
{
get
{
return this.queue.AddedCount + this.DroppedCount;
return this.circularBuffer.AddedCount + this.DroppedCount;
}
}
@ -106,18 +117,18 @@ namespace OpenTelemetry.Trace
{
get
{
return this.queue.RemovedCount;
return this.circularBuffer.RemovedCount;
}
}
/// <inheritdoc/>
public override void OnEnd(Activity activity)
{
if (this.queue.TryAdd(activity, maxSpinCount: 50000))
if (this.circularBuffer.TryAdd(activity, maxSpinCount: 50000))
{
if (this.queue.Count >= this.maxExportBatchSize)
if (this.circularBuffer.Count >= this.maxExportBatchSize)
{
// TODO: signal the exporter
this.exportTrigger.Set();
}
return; // enqueue succeeded
@ -127,6 +138,78 @@ namespace OpenTelemetry.Trace
Interlocked.Increment(ref this.droppedCount);
}
/// <summary>
/// Flushes the <see cref="Activity"/> currently in the queue, blocks
/// the current thread until flush completed, shutdown signaled or
/// timed out.
/// </summary>
/// <param name="timeoutMillis">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when flush completed; otherwise, <c>false</c>.
/// </returns>
public bool ForceFlush(int timeoutMillis = Timeout.Infinite)
{
if (timeoutMillis < 0 && timeoutMillis != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMillis));
}
var tail = this.circularBuffer.RemovedCount;
var head = this.circularBuffer.AddedCount;
if (head == tail)
{
return true; // nothing to flush
}
this.exportTrigger.Set();
if (timeoutMillis == 0)
{
return false;
}
var triggers = new WaitHandle[] { this.dataExportedNotification, this.shutdownTrigger };
var sw = Stopwatch.StartNew();
// There is a chance that the export thread finished processing all the data from the queue,
// and signaled before we enter wait here, use polling to prevent being blocked indefinitely.
const int pollingMillis = 1000;
while (true)
{
if (timeoutMillis == Timeout.Infinite)
{
WaitHandle.WaitAny(triggers, pollingMillis);
}
else
{
var timeout = (long)timeoutMillis - sw.ElapsedMilliseconds;
if (timeout <= 0)
{
return this.circularBuffer.RemovedCount >= head;
}
WaitHandle.WaitAny(triggers, Math.Min((int)timeout, pollingMillis));
}
if (this.circularBuffer.RemovedCount >= head)
{
return true;
}
if (this.shutdownDrainTarget != long.MaxValue)
{
return false;
}
}
}
/// <inheritdoc/>
/// <exception cref="OperationCanceledException">If the <paramref name="cancellationToken"/> is canceled.</exception>
public override Task ForceFlushAsync(CancellationToken cancellationToken)
@ -135,6 +218,30 @@ namespace OpenTelemetry.Trace
throw new NotImplementedException();
}
/// <summary>
/// Attempt to drain the queue and shutdown the exporter, blocks the
/// current thread until shutdown completed or timed out.
/// </summary>
/// <param name="timeoutMillis">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// </param>
public void Shutdown(int timeoutMillis = Timeout.Infinite)
{
if (timeoutMillis < 0 && timeoutMillis != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMillis));
}
this.shutdownDrainTarget = this.circularBuffer.AddedCount;
this.shutdownTrigger.Set();
if (timeoutMillis != 0)
{
this.exporterThread.Join(timeoutMillis);
}
}
/// <inheritdoc/>
/// <exception cref="OperationCanceledException">If the <paramref name="cancellationToken"/> is canceled.</exception>
public override Task ShutdownAsync(CancellationToken cancellationToken)
@ -153,6 +260,9 @@ namespace OpenTelemetry.Trace
if (disposing && !this.disposed)
{
// TODO: Dispose/Shutdown flow needs to be redesigned, currently it is convoluted.
this.Shutdown(this.exporterTimeoutMillis);
try
{
this.exporter.Dispose();
@ -165,5 +275,32 @@ namespace OpenTelemetry.Trace
this.disposed = true;
}
}
private void ExporterProc()
{
var triggers = new WaitHandle[] { this.exportTrigger, this.shutdownTrigger };
while (true)
{
// only wait when the queue doesn't have enough items, otherwise keep busy and send data continuously
if (this.circularBuffer.Count < this.maxExportBatchSize)
{
WaitHandle.WaitAny(triggers, this.scheduledDelayMillis);
}
if (this.circularBuffer.Count > 0)
{
this.exporter.Export(new Batch<Activity>(this.circularBuffer, this.maxExportBatchSize));
this.dataExportedNotification.Set();
this.dataExportedNotification.Reset();
}
if (this.circularBuffer.RemovedCount >= this.shutdownDrainTarget)
{
break;
}
}
}
}
}

View File

@ -14,11 +14,7 @@
// limitations under the License.
// </copyright>
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Internal;
namespace OpenTelemetry.Trace
{