195 lines
5.3 KiB
C#
195 lines
5.3 KiB
C#
// Copyright The OpenTelemetry Authors
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
using System.Diagnostics;
|
|
using OpenTelemetry.Internal;
|
|
|
|
namespace OpenTelemetry;
|
|
|
|
/// <summary>
|
|
/// Represents a chain of <see cref="BaseProcessor{T}"/>s.
|
|
/// </summary>
|
|
/// <typeparam name="T">The type of object to be processed.</typeparam>
|
|
public class CompositeProcessor<T> : BaseProcessor<T>
|
|
{
|
|
internal readonly DoublyLinkedListNode Head;
|
|
private DoublyLinkedListNode tail;
|
|
private bool disposed;
|
|
|
|
/// <summary>
|
|
/// Initializes a new instance of the <see cref="CompositeProcessor{T}"/> class.
|
|
/// </summary>
|
|
/// <param name="processors">Processors to add to the composite processor chain.</param>
|
|
public CompositeProcessor(IEnumerable<BaseProcessor<T>> processors)
|
|
{
|
|
Guard.ThrowIfNull(processors);
|
|
|
|
#pragma warning disable CA1062 // Validate arguments of public methods - needed for netstandard2.1
|
|
using var iter = processors.GetEnumerator();
|
|
#pragma warning restore CA1062 // Validate arguments of public methods - needed for netstandard2.1
|
|
if (!iter.MoveNext())
|
|
{
|
|
throw new ArgumentException($"'{iter}' is null or empty", nameof(processors));
|
|
}
|
|
|
|
this.Head = new DoublyLinkedListNode(iter.Current);
|
|
this.tail = this.Head;
|
|
|
|
while (iter.MoveNext())
|
|
{
|
|
this.AddProcessor(iter.Current);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Adds a processor to the composite processor chain.
|
|
/// </summary>
|
|
/// <param name="processor"><see cref="BaseProcessor{T}"/>.</param>
|
|
/// <returns>The current instance to support call chaining.</returns>
|
|
public CompositeProcessor<T> AddProcessor(BaseProcessor<T> processor)
|
|
{
|
|
Guard.ThrowIfNull(processor);
|
|
|
|
var node = new DoublyLinkedListNode(processor)
|
|
{
|
|
Previous = this.tail,
|
|
};
|
|
this.tail.Next = node;
|
|
this.tail = node;
|
|
|
|
return this;
|
|
}
|
|
|
|
/// <inheritdoc/>
|
|
public override void OnEnd(T data)
|
|
{
|
|
for (var cur = this.Head; cur != null; cur = cur.Next)
|
|
{
|
|
cur.Value.OnEnd(data);
|
|
}
|
|
}
|
|
|
|
/// <inheritdoc/>
|
|
public override void OnStart(T data)
|
|
{
|
|
for (var cur = this.Head; cur != null; cur = cur.Next)
|
|
{
|
|
cur.Value.OnStart(data);
|
|
}
|
|
}
|
|
|
|
internal override void SetParentProvider(BaseProvider parentProvider)
|
|
{
|
|
base.SetParentProvider(parentProvider);
|
|
|
|
for (var cur = this.Head; cur != null; cur = cur.Next)
|
|
{
|
|
cur.Value.SetParentProvider(parentProvider);
|
|
}
|
|
}
|
|
|
|
internal IReadOnlyList<BaseProcessor<T>> ToReadOnlyList()
|
|
{
|
|
var list = new List<BaseProcessor<T>>();
|
|
|
|
for (var cur = this.Head; cur != null; cur = cur.Next)
|
|
{
|
|
list.Add(cur.Value);
|
|
}
|
|
|
|
return list;
|
|
}
|
|
|
|
/// <inheritdoc/>
|
|
protected override bool OnForceFlush(int timeoutMilliseconds)
|
|
{
|
|
var result = true;
|
|
var sw = timeoutMilliseconds == Timeout.Infinite
|
|
? null
|
|
: Stopwatch.StartNew();
|
|
|
|
for (var cur = this.Head; cur != null; cur = cur.Next)
|
|
{
|
|
if (sw == null)
|
|
{
|
|
result = cur.Value.ForceFlush() && result;
|
|
}
|
|
else
|
|
{
|
|
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;
|
|
|
|
// notify all the processors, even if we run overtime
|
|
result = cur.Value.ForceFlush((int)Math.Max(timeout, 0)) && result;
|
|
}
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
/// <inheritdoc/>
|
|
protected override bool OnShutdown(int timeoutMilliseconds)
|
|
{
|
|
var result = true;
|
|
var sw = timeoutMilliseconds == Timeout.Infinite
|
|
? null
|
|
: Stopwatch.StartNew();
|
|
|
|
for (var cur = this.Head; cur != null; cur = cur.Next)
|
|
{
|
|
if (sw == null)
|
|
{
|
|
result = cur.Value.Shutdown() && result;
|
|
}
|
|
else
|
|
{
|
|
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;
|
|
|
|
// notify all the processors, even if we run overtime
|
|
result = cur.Value.Shutdown((int)Math.Max(timeout, 0)) && result;
|
|
}
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
/// <inheritdoc/>
|
|
protected override void Dispose(bool disposing)
|
|
{
|
|
if (!this.disposed)
|
|
{
|
|
if (disposing)
|
|
{
|
|
for (var cur = this.Head; cur != null; cur = cur.Next)
|
|
{
|
|
try
|
|
{
|
|
cur.Value.Dispose();
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.Dispose), ex);
|
|
}
|
|
}
|
|
}
|
|
|
|
this.disposed = true;
|
|
}
|
|
|
|
base.Dispose(disposing);
|
|
}
|
|
|
|
internal sealed class DoublyLinkedListNode
|
|
{
|
|
public readonly BaseProcessor<T> Value;
|
|
|
|
public DoublyLinkedListNode(BaseProcessor<T> value)
|
|
{
|
|
this.Value = value;
|
|
}
|
|
|
|
public DoublyLinkedListNode? Previous { get; set; }
|
|
|
|
public DoublyLinkedListNode? Next { get; set; }
|
|
}
|
|
}
|