// // Copyright The OpenTelemetry Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #nullable enable using System.Diagnostics; using System.Runtime.CompilerServices; using OpenTelemetry.Internal; namespace OpenTelemetry { /// /// Implements processor that batches telemetry objects before calling exporter. /// /// The type of telemetry object to be exported. public abstract class BatchExportProcessor : BaseExportProcessor where T : class { internal const int DefaultMaxQueueSize = 2048; internal const int DefaultScheduledDelayMilliseconds = 5000; internal const int DefaultExporterTimeoutMilliseconds = 30000; internal const int DefaultMaxExportBatchSize = 512; internal readonly int MaxExportBatchSize; private readonly CircularBuffer circularBuffer; private readonly int scheduledDelayMilliseconds; private readonly int exporterTimeoutMilliseconds; private readonly Thread exporterThread; private readonly AutoResetEvent exportTrigger = new(false); private readonly ManualResetEvent dataExportedNotification = new(false); private readonly ManualResetEvent shutdownTrigger = new(false); private long shutdownDrainTarget = long.MaxValue; private long droppedCount; private bool disposed; /// /// Initializes a new instance of the class. /// /// Exporter instance. /// The maximum queue size. After the size is reached data are dropped. The default value is 2048. /// The delay interval in milliseconds between two consecutive exports. The default value is 5000. /// How long the export can run before it is cancelled. The default value is 30000. /// The maximum batch size of every export. It must be smaller or equal to maxQueueSize. The default value is 512. protected BatchExportProcessor( BaseExporter exporter, int maxQueueSize = DefaultMaxQueueSize, int scheduledDelayMilliseconds = DefaultScheduledDelayMilliseconds, int exporterTimeoutMilliseconds = DefaultExporterTimeoutMilliseconds, int maxExportBatchSize = DefaultMaxExportBatchSize) : base(exporter) { Guard.ThrowIfOutOfRange(maxQueueSize, min: 1); Guard.ThrowIfOutOfRange(maxExportBatchSize, min: 1, max: maxQueueSize, maxName: nameof(maxQueueSize)); Guard.ThrowIfOutOfRange(scheduledDelayMilliseconds, min: 1); Guard.ThrowIfOutOfRange(exporterTimeoutMilliseconds, min: 0); this.circularBuffer = new CircularBuffer(maxQueueSize); this.scheduledDelayMilliseconds = scheduledDelayMilliseconds; this.exporterTimeoutMilliseconds = exporterTimeoutMilliseconds; this.MaxExportBatchSize = maxExportBatchSize; this.exporterThread = new Thread(this.ExporterProc) { IsBackground = true, Name = $"OpenTelemetry-{nameof(BatchExportProcessor)}-{exporter.GetType().Name}", }; this.exporterThread.Start(); } /// /// Gets the number of telemetry objects dropped by the processor. /// internal long DroppedCount => Volatile.Read(ref this.droppedCount); /// /// Gets the number of telemetry objects received by the processor. /// internal long ReceivedCount => this.circularBuffer.AddedCount + this.DroppedCount; /// /// Gets the number of telemetry objects processed by the underlying exporter. /// internal long ProcessedCount => this.circularBuffer.RemovedCount; [MethodImpl(MethodImplOptions.AggressiveInlining)] internal bool TryExport(T data) { if (this.circularBuffer.TryAdd(data, maxSpinCount: 50000)) { if (this.circularBuffer.Count >= this.MaxExportBatchSize) { try { this.exportTrigger.Set(); } catch (ObjectDisposedException) { } } return true; // enqueue succeeded } // either the queue is full or exceeded the spin limit, drop the item on the floor Interlocked.Increment(ref this.droppedCount); return false; } /// protected override void OnExport(T data) { this.TryExport(data); } /// protected override bool OnForceFlush(int timeoutMilliseconds) { var tail = this.circularBuffer.RemovedCount; var head = this.circularBuffer.AddedCount; if (head == tail) { return true; // nothing to flush } try { this.exportTrigger.Set(); } catch (ObjectDisposedException) { return false; } if (timeoutMilliseconds == 0) { return false; } var triggers = new WaitHandle[] { this.dataExportedNotification, this.shutdownTrigger }; var sw = timeoutMilliseconds == Timeout.Infinite ? null : Stopwatch.StartNew(); // There is a chance that the export thread finished processing all the data from the queue, // and signaled before we enter wait here, use polling to prevent being blocked indefinitely. const int pollingMilliseconds = 1000; while (true) { if (sw == null) { try { WaitHandle.WaitAny(triggers, pollingMilliseconds); } catch (ObjectDisposedException) { return false; } } else { var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds; if (timeout <= 0) { return this.circularBuffer.RemovedCount >= head; } try { WaitHandle.WaitAny(triggers, Math.Min((int)timeout, pollingMilliseconds)); } catch (ObjectDisposedException) { return false; } } if (this.circularBuffer.RemovedCount >= head) { return true; } if (Volatile.Read(ref this.shutdownDrainTarget) != long.MaxValue) { return false; } } } /// protected override bool OnShutdown(int timeoutMilliseconds) { Volatile.Write(ref this.shutdownDrainTarget, this.circularBuffer.AddedCount); try { this.shutdownTrigger.Set(); } catch (ObjectDisposedException) { return false; } OpenTelemetrySdkEventSource.Log.DroppedExportProcessorItems(this.GetType().Name, this.exporter.GetType().Name, this.DroppedCount); if (timeoutMilliseconds == Timeout.Infinite) { this.exporterThread.Join(); return this.exporter.Shutdown(); } if (timeoutMilliseconds == 0) { return this.exporter.Shutdown(0); } var sw = Stopwatch.StartNew(); this.exporterThread.Join(timeoutMilliseconds); var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds; return this.exporter.Shutdown((int)Math.Max(timeout, 0)); } /// protected override void Dispose(bool disposing) { if (!this.disposed) { if (disposing) { this.exportTrigger.Dispose(); this.dataExportedNotification.Dispose(); this.shutdownTrigger.Dispose(); } this.disposed = true; } base.Dispose(disposing); } private void ExporterProc() { var triggers = new WaitHandle[] { this.exportTrigger, this.shutdownTrigger }; while (true) { // only wait when the queue doesn't have enough items, otherwise keep busy and send data continuously if (this.circularBuffer.Count < this.MaxExportBatchSize) { try { WaitHandle.WaitAny(triggers, this.scheduledDelayMilliseconds); } catch (ObjectDisposedException) { // the exporter is somehow disposed before the worker thread could finish its job return; } } if (this.circularBuffer.Count > 0) { using (var batch = new Batch(this.circularBuffer, this.MaxExportBatchSize)) { this.exporter.Export(batch); } try { this.dataExportedNotification.Set(); this.dataExportedNotification.Reset(); } catch (ObjectDisposedException) { // the exporter is somehow disposed before the worker thread could finish its job return; } } if (this.circularBuffer.RemovedCount >= Volatile.Read(ref this.shutdownDrainTarget)) { return; } } } } }