From bf694a0641942d045bb68c0247d73df6f5e0f4cc Mon Sep 17 00:00:00 2001 From: Reiley Yang Date: Tue, 18 Aug 2020 20:58:00 -0700 Subject: [PATCH] Refactor exporter - step 6 (#1094) * implement BatchExportActivityProcessor * fix typo * wrap comments * no need to stop a Stopwatch * fix nit * add thread name * adopt zero-alloc enumerator * avoid calling exporter with zero item * better naming * clean up * fix the missing exportTrigger reset * shutdown drain till sentry * simplify the flow * simplify the code * periodic polling to avoid dead lock --- .../building-your-own-exporter/MyExporter.cs | 21 +-- .../MyExporterHelperExtensions.cs | 2 +- .../Trace/BatchExportActivityProcessor.cs | 159 ++++++++++++++++-- .../Trace/SimpleExportActivityProcessor.cs | 4 - 4 files changed, 152 insertions(+), 34 deletions(-) diff --git a/docs/trace/building-your-own-exporter/MyExporter.cs b/docs/trace/building-your-own-exporter/MyExporter.cs index 45f6a4a2a..653b1a655 100644 --- a/docs/trace/building-your-own-exporter/MyExporter.cs +++ b/docs/trace/building-your-own-exporter/MyExporter.cs @@ -15,17 +15,13 @@ // using System; -using System.Collections.Generic; using System.Diagnostics; -using System.Threading; -using System.Threading.Tasks; using OpenTelemetry; using OpenTelemetry.Trace; -internal class MyExporter : ActivityExporter +internal class MyExporter : ActivityExporterSync { - public override Task ExportAsync( - IEnumerable batch, CancellationToken cancellationToken) + public override ExportResultSync Export(in Batch batch) { // Exporter code which can generate further // telemetry should do so inside SuppressInstrumentation @@ -38,17 +34,6 @@ internal class MyExporter : ActivityExporter Console.WriteLine($"{activity.DisplayName}"); } - return Task.FromResult(ExportResult.Success); - } - - public override Task ShutdownAsync(CancellationToken cancellationToken) - { - Console.WriteLine($"MyExporter.ShutdownAsync"); - return Task.CompletedTask; - } - - protected override void Dispose(bool disposing) - { - Console.WriteLine($"MyExporter.Dispose"); + return ExportResultSync.Success; } } diff --git a/docs/trace/building-your-own-exporter/MyExporterHelperExtensions.cs b/docs/trace/building-your-own-exporter/MyExporterHelperExtensions.cs index 0a1975572..eea350882 100644 --- a/docs/trace/building-your-own-exporter/MyExporterHelperExtensions.cs +++ b/docs/trace/building-your-own-exporter/MyExporterHelperExtensions.cs @@ -26,6 +26,6 @@ internal static class MyExporterHelperExtensions throw new ArgumentNullException(nameof(builder)); } - return builder.AddProcessor(new SimpleActivityProcessor(new MyExporter())); + return builder.AddProcessor(new BatchExportActivityProcessor(new MyExporter())); } } diff --git a/src/OpenTelemetry/Trace/BatchExportActivityProcessor.cs b/src/OpenTelemetry/Trace/BatchExportActivityProcessor.cs index 035f81fc0..03f6acb87 100644 --- a/src/OpenTelemetry/Trace/BatchExportActivityProcessor.cs +++ b/src/OpenTelemetry/Trace/BatchExportActivityProcessor.cs @@ -28,10 +28,15 @@ namespace OpenTelemetry.Trace public class BatchExportActivityProcessor : ActivityProcessor { private readonly ActivityExporterSync exporter; - private readonly CircularBuffer queue; - private readonly TimeSpan scheduledDelay; - private readonly TimeSpan exporterTimeout; + private readonly CircularBuffer circularBuffer; + private readonly int scheduledDelayMillis; + private readonly int exporterTimeoutMillis; private readonly int maxExportBatchSize; + private readonly Thread exporterThread; + private readonly AutoResetEvent exportTrigger = new AutoResetEvent(false); + private readonly ManualResetEvent dataExportedNotification = new ManualResetEvent(false); + private readonly ManualResetEvent shutdownTrigger = new ManualResetEvent(false); + private long shutdownDrainTarget = long.MaxValue; private bool disposed; private long droppedCount = 0; @@ -71,10 +76,16 @@ namespace OpenTelemetry.Trace } this.exporter = exporter ?? throw new ArgumentNullException(nameof(exporter)); - this.queue = new CircularBuffer(maxQueueSize); - this.scheduledDelay = TimeSpan.FromMilliseconds(scheduledDelayMillis); - this.exporterTimeout = TimeSpan.FromMilliseconds(exporterTimeoutMillis); + this.circularBuffer = new CircularBuffer(maxQueueSize); + this.scheduledDelayMillis = scheduledDelayMillis; + this.exporterTimeoutMillis = exporterTimeoutMillis; this.maxExportBatchSize = maxExportBatchSize; + this.exporterThread = new Thread(new ThreadStart(this.ExporterProc)) + { + IsBackground = true, + Name = $"OpenTelemetry-{nameof(BatchExportActivityProcessor)}-{exporter.GetType().Name}", + }; + this.exporterThread.Start(); } /// @@ -95,7 +106,7 @@ namespace OpenTelemetry.Trace { get { - return this.queue.AddedCount + this.DroppedCount; + return this.circularBuffer.AddedCount + this.DroppedCount; } } @@ -106,18 +117,18 @@ namespace OpenTelemetry.Trace { get { - return this.queue.RemovedCount; + return this.circularBuffer.RemovedCount; } } /// public override void OnEnd(Activity activity) { - if (this.queue.TryAdd(activity, maxSpinCount: 50000)) + if (this.circularBuffer.TryAdd(activity, maxSpinCount: 50000)) { - if (this.queue.Count >= this.maxExportBatchSize) + if (this.circularBuffer.Count >= this.maxExportBatchSize) { - // TODO: signal the exporter + this.exportTrigger.Set(); } return; // enqueue succeeded @@ -127,6 +138,78 @@ namespace OpenTelemetry.Trace Interlocked.Increment(ref this.droppedCount); } + /// + /// Flushes the currently in the queue, blocks + /// the current thread until flush completed, shutdown signaled or + /// timed out. + /// + /// + /// The number of milliseconds to wait, or Timeout.Infinite to + /// wait indefinitely. + /// + /// + /// Returns true when flush completed; otherwise, false. + /// + public bool ForceFlush(int timeoutMillis = Timeout.Infinite) + { + if (timeoutMillis < 0 && timeoutMillis != Timeout.Infinite) + { + throw new ArgumentOutOfRangeException(nameof(timeoutMillis)); + } + + var tail = this.circularBuffer.RemovedCount; + var head = this.circularBuffer.AddedCount; + + if (head == tail) + { + return true; // nothing to flush + } + + this.exportTrigger.Set(); + + if (timeoutMillis == 0) + { + return false; + } + + var triggers = new WaitHandle[] { this.dataExportedNotification, this.shutdownTrigger }; + + var sw = Stopwatch.StartNew(); + + // There is a chance that the export thread finished processing all the data from the queue, + // and signaled before we enter wait here, use polling to prevent being blocked indefinitely. + const int pollingMillis = 1000; + + while (true) + { + if (timeoutMillis == Timeout.Infinite) + { + WaitHandle.WaitAny(triggers, pollingMillis); + } + else + { + var timeout = (long)timeoutMillis - sw.ElapsedMilliseconds; + + if (timeout <= 0) + { + return this.circularBuffer.RemovedCount >= head; + } + + WaitHandle.WaitAny(triggers, Math.Min((int)timeout, pollingMillis)); + } + + if (this.circularBuffer.RemovedCount >= head) + { + return true; + } + + if (this.shutdownDrainTarget != long.MaxValue) + { + return false; + } + } + } + /// /// If the is canceled. public override Task ForceFlushAsync(CancellationToken cancellationToken) @@ -135,6 +218,30 @@ namespace OpenTelemetry.Trace throw new NotImplementedException(); } + /// + /// Attempt to drain the queue and shutdown the exporter, blocks the + /// current thread until shutdown completed or timed out. + /// + /// + /// The number of milliseconds to wait, or Timeout.Infinite to + /// wait indefinitely. + /// + public void Shutdown(int timeoutMillis = Timeout.Infinite) + { + if (timeoutMillis < 0 && timeoutMillis != Timeout.Infinite) + { + throw new ArgumentOutOfRangeException(nameof(timeoutMillis)); + } + + this.shutdownDrainTarget = this.circularBuffer.AddedCount; + this.shutdownTrigger.Set(); + + if (timeoutMillis != 0) + { + this.exporterThread.Join(timeoutMillis); + } + } + /// /// If the is canceled. public override Task ShutdownAsync(CancellationToken cancellationToken) @@ -153,6 +260,9 @@ namespace OpenTelemetry.Trace if (disposing && !this.disposed) { + // TODO: Dispose/Shutdown flow needs to be redesigned, currently it is convoluted. + this.Shutdown(this.exporterTimeoutMillis); + try { this.exporter.Dispose(); @@ -165,5 +275,32 @@ namespace OpenTelemetry.Trace this.disposed = true; } } + + private void ExporterProc() + { + var triggers = new WaitHandle[] { this.exportTrigger, this.shutdownTrigger }; + + while (true) + { + // only wait when the queue doesn't have enough items, otherwise keep busy and send data continuously + if (this.circularBuffer.Count < this.maxExportBatchSize) + { + WaitHandle.WaitAny(triggers, this.scheduledDelayMillis); + } + + if (this.circularBuffer.Count > 0) + { + this.exporter.Export(new Batch(this.circularBuffer, this.maxExportBatchSize)); + + this.dataExportedNotification.Set(); + this.dataExportedNotification.Reset(); + } + + if (this.circularBuffer.RemovedCount >= this.shutdownDrainTarget) + { + break; + } + } + } } } diff --git a/src/OpenTelemetry/Trace/SimpleExportActivityProcessor.cs b/src/OpenTelemetry/Trace/SimpleExportActivityProcessor.cs index d25acadb8..9d2bd9ef3 100644 --- a/src/OpenTelemetry/Trace/SimpleExportActivityProcessor.cs +++ b/src/OpenTelemetry/Trace/SimpleExportActivityProcessor.cs @@ -14,11 +14,7 @@ // limitations under the License. // -using System; using System.Diagnostics; -using System.Threading; -using System.Threading.Tasks; -using OpenTelemetry.Internal; namespace OpenTelemetry.Trace {