Logs: Add log record pooling (#3385)

* Add log record pooling.

* Tweak.

* Update CHANGELOG.

* Code review.

* Code review.

* Code review.

* Code review.

* Nit.

* Improvement.

* Code review.

* Make resize internal.

* Add more details to comments about write race.

* State buffering tests.

* Comments about buffering.

* Added code comments.

Co-authored-by: Cijo Thomas <cithomas@microsoft.com>
This commit is contained in:
Mikel Blanchard 2022-06-30 09:33:09 -07:00 committed by GitHub
parent 09ef47dbc1
commit 9858e3ace3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1148 additions and 51 deletions

View File

@ -42,9 +42,7 @@ namespace OpenTelemetry.Logs
foreach (var log in batch)
{
log.BufferLogScopes();
exportedItems.Add(log);
exportedItems.Add(log.Copy());
}
return ExportResult.Success;

View File

@ -22,6 +22,7 @@ using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using OpenTelemetry.Internal;
using OpenTelemetry.Logs;
namespace OpenTelemetry
{
@ -90,7 +91,11 @@ namespace OpenTelemetry
// Drain anything left in the batch.
while (this.circularBuffer.RemovedCount < this.targetCount)
{
this.circularBuffer.Read();
T item = this.circularBuffer.Read();
if (typeof(T) == typeof(LogRecord))
{
LogRecordSharedPool.Current.Return((LogRecord)(object)item);
}
}
}
}
@ -140,6 +145,32 @@ namespace OpenTelemetry
return false;
};
private static readonly BatchEnumeratorMoveNextFunc MoveNextCircularBufferLogRecord = (ref Enumerator enumerator) =>
{
// Note: This type check here is to give the JIT a hint it can
// remove all of this code when T != LogRecord
if (typeof(T) == typeof(LogRecord))
{
var circularBuffer = enumerator.circularBuffer;
var currentItem = enumerator.Current;
if (currentItem != null)
{
LogRecordSharedPool.Current.Return((LogRecord)(object)currentItem);
}
if (circularBuffer!.RemovedCount < enumerator.targetCount)
{
enumerator.current = circularBuffer.Read();
return true;
}
enumerator.current = null;
}
return false;
};
private static readonly BatchEnumeratorMoveNextFunc MoveNextArray = (ref Enumerator enumerator) =>
{
var items = enumerator.items;
@ -179,7 +210,7 @@ namespace OpenTelemetry
this.circularBuffer = circularBuffer;
this.targetCount = targetCount;
this.itemIndex = 0;
this.moveNextFunc = MoveNextCircularBuffer;
this.moveNextFunc = typeof(T) == typeof(LogRecord) ? MoveNextCircularBufferLogRecord : MoveNextCircularBuffer;
}
internal Enumerator(T[] items, long targetCount)
@ -201,6 +232,15 @@ namespace OpenTelemetry
/// <inheritdoc/>
public void Dispose()
{
if (typeof(T) == typeof(LogRecord))
{
var currentItem = this.current;
if (currentItem != null)
{
LogRecordSharedPool.Current.Return((LogRecord)(object)currentItem);
this.current = null;
}
}
}
/// <inheritdoc/>

View File

@ -18,6 +18,7 @@
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using OpenTelemetry.Internal;
@ -95,8 +96,8 @@ namespace OpenTelemetry
/// </summary>
internal long ProcessedCount => this.circularBuffer.RemovedCount;
/// <inheritdoc/>
protected override void OnExport(T data)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal bool TryExport(T data)
{
if (this.circularBuffer.TryAdd(data, maxSpinCount: 50000))
{
@ -111,11 +112,19 @@ namespace OpenTelemetry
}
}
return; // enqueue succeeded
return true; // enqueue succeeded
}
// either the queue is full or exceeded the spin limit, drop the item on the floor
Interlocked.Increment(ref this.droppedCount);
return false;
}
/// <inheritdoc/>
protected override void OnExport(T data)
{
this.TryExport(data);
}
/// <inheritdoc/>

View File

@ -25,6 +25,9 @@
* Handle possible exception when initializing the default service name.
([#3405](https://github.com/open-telemetry/opentelemetry-dotnet/pull/3405))
* `LogRecord` instances are now reused to reduce memory pressure
([#3385](https://github.com/open-telemetry/opentelemetry-dotnet/pull/3385))
## 1.3.0
Released 2022-Jun-03

View File

@ -26,7 +26,7 @@ namespace OpenTelemetry
{
public class CompositeProcessor<T> : BaseProcessor<T>
{
private readonly DoublyLinkedListNode head;
internal readonly DoublyLinkedListNode Head;
private DoublyLinkedListNode tail;
private bool disposed;
@ -40,8 +40,8 @@ namespace OpenTelemetry
throw new ArgumentException($"'{iter}' is null or empty", nameof(iter));
}
this.head = new DoublyLinkedListNode(iter.Current);
this.tail = this.head;
this.Head = new DoublyLinkedListNode(iter.Current);
this.tail = this.Head;
while (iter.MoveNext())
{
@ -66,7 +66,7 @@ namespace OpenTelemetry
/// <inheritdoc/>
public override void OnEnd(T data)
{
for (var cur = this.head; cur != null; cur = cur.Next)
for (var cur = this.Head; cur != null; cur = cur.Next)
{
cur.Value.OnEnd(data);
}
@ -75,7 +75,7 @@ namespace OpenTelemetry
/// <inheritdoc/>
public override void OnStart(T data)
{
for (var cur = this.head; cur != null; cur = cur.Next)
for (var cur = this.Head; cur != null; cur = cur.Next)
{
cur.Value.OnStart(data);
}
@ -85,7 +85,7 @@ namespace OpenTelemetry
{
base.SetParentProvider(parentProvider);
for (var cur = this.head; cur != null; cur = cur.Next)
for (var cur = this.Head; cur != null; cur = cur.Next)
{
cur.Value.SetParentProvider(parentProvider);
}
@ -99,7 +99,7 @@ namespace OpenTelemetry
? null
: Stopwatch.StartNew();
for (var cur = this.head; cur != null; cur = cur.Next)
for (var cur = this.Head; cur != null; cur = cur.Next)
{
if (sw == null)
{
@ -125,7 +125,7 @@ namespace OpenTelemetry
? null
: Stopwatch.StartNew();
for (var cur = this.head; cur != null; cur = cur.Next)
for (var cur = this.Head; cur != null; cur = cur.Next)
{
if (sw == null)
{
@ -150,7 +150,7 @@ namespace OpenTelemetry
{
if (disposing)
{
for (var cur = this.head; cur != null; cur = cur.Next)
for (var cur = this.Head; cur != null; cur = cur.Next)
{
try
{
@ -169,7 +169,7 @@ namespace OpenTelemetry
base.Dispose(disposing);
}
private class DoublyLinkedListNode
internal sealed class DoublyLinkedListNode
{
public readonly BaseProcessor<T> Value;

View File

@ -57,9 +57,14 @@ namespace OpenTelemetry
// happen here.
Debug.Assert(data != null, "LogRecord was null.");
data!.BufferLogScopes();
data!.Buffer();
base.OnEnd(data);
data.AddReference();
if (!this.TryExport(data))
{
LogRecordSharedPool.Current.Return(data);
}
}
}
}

View File

@ -19,6 +19,8 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using Microsoft.Extensions.Logging;
using OpenTelemetry.Internal;
@ -30,15 +32,21 @@ namespace OpenTelemetry.Logs
public sealed class LogRecord
{
internal LogRecordData Data;
internal List<KeyValuePair<string, object?>>? AttributeStorage;
internal List<object?>? BufferedScopes;
internal int PoolReferenceCount = int.MaxValue;
private static readonly Action<object?, List<object?>> AddScopeToBufferedList = (object? scope, List<object?> state) =>
{
state.Add(scope);
};
private List<object?>? bufferedScopes;
internal LogRecord()
{
}
// Note: Some users are calling this with reflection. Try not to change the signature to be nice.
[Obsolete("Call LogRecordPool.Rent instead.")]
internal LogRecord(
IExternalScopeProvider? scopeProvider,
DateTime timestamp,
@ -191,9 +199,9 @@ namespace OpenTelemetry.Logs
var forEachScopeState = new ScopeForEachState<TState>(callback, state);
if (this.bufferedScopes != null)
if (this.BufferedScopes != null)
{
foreach (object? scope in this.bufferedScopes)
foreach (object? scope in this.BufferedScopes)
{
ScopeForEachState<TState>.ForEachScope(scope, forEachScopeState);
}
@ -213,22 +221,99 @@ namespace OpenTelemetry.Logs
return ref this.Data;
}
/// <summary>
/// Buffers the scopes attached to the log into a list so that they can
/// be safely processed after the log message lifecycle has ended.
/// </summary>
internal void BufferLogScopes()
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void ResetReferenceCount()
{
if (this.ScopeProvider == null || this.bufferedScopes != null)
this.PoolReferenceCount = 1;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void AddReference()
{
Interlocked.Increment(ref this.PoolReferenceCount);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal int RemoveReference()
{
return Interlocked.Decrement(ref this.PoolReferenceCount);
}
// Note: Typically called when LogRecords are added into a batch so they
// can be safely processed outside of the log call chain.
internal void Buffer()
{
// Note: State values are buffered because some states are not safe
// to access outside of the log call chain. See:
// https://github.com/open-telemetry/opentelemetry-dotnet/issues/2905
this.BufferLogStateValues();
this.BufferLogScopes();
// Note: There is no buffering of "State" only "StateValues". We
// don't inspect "object State" at all. It is undefined what
// exporters will do with "State". Some might ignore it, some might
// attempt to access it as a list. That is potentially dangerous.
// TODO: Investigate what to do here. Should we obsolete State and
// just use the StateValues design?
}
internal LogRecord Copy()
{
// Note: We only buffer scopes here because state values are copied
// directly below.
this.BufferLogScopes();
return new()
{
Data = this.Data,
State = this.State,
StateValues = this.StateValues == null ? null : new List<KeyValuePair<string, object?>>(this.StateValues),
BufferedScopes = this.BufferedScopes == null ? null : new List<object?>(this.BufferedScopes),
};
}
/// <summary>
/// Buffers the state values attached to the log into a list so that
/// they can be safely processed after the log message lifecycle has
/// ended.
/// </summary>
private void BufferLogStateValues()
{
var stateValues = this.StateValues;
if (stateValues == null || stateValues == this.AttributeStorage)
{
return;
}
List<object?> scopes = new List<object?>();
var attributeStorage = this.AttributeStorage ??= new List<KeyValuePair<string, object?>>(stateValues.Count);
this.ScopeProvider?.ForEachScope(AddScopeToBufferedList, scopes);
// Note: AddRange here will copy all of the KeyValuePairs from
// stateValues to AttributeStorage. This "captures" the state and
// fixes issues where the values are generated at enumeration time
// like
// https://github.com/open-telemetry/opentelemetry-dotnet/issues/2905.
attributeStorage.AddRange(stateValues);
this.bufferedScopes = scopes;
this.StateValues = attributeStorage;
}
/// <summary>
/// Buffers the scopes attached to the log into a list so that they can
/// be safely processed after the log message lifecycle has ended.
/// </summary>
private void BufferLogScopes()
{
if (this.ScopeProvider == null)
{
return;
}
List<object?> scopes = this.BufferedScopes ??= new List<object?>(LogRecordPoolHelper.DefaultMaxNumberOfScopes);
this.ScopeProvider.ForEachScope(AddScopeToBufferedList, scopes);
this.ScopeProvider = null;
}
private readonly struct ScopeForEachState<TState>

View File

@ -18,6 +18,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using Microsoft.Extensions.Logging;
using OpenTelemetry.Internal;
@ -52,20 +53,32 @@ namespace OpenTelemetry.Logs
var processor = provider.Processor;
if (processor != null)
{
var record = new LogRecord(
provider.IncludeScopes ? this.ScopeProvider : null,
DateTime.UtcNow,
this.categoryName,
logLevel,
eventId,
provider.IncludeFormattedMessage ? formatter?.Invoke(state, exception) : null,
provider.ParseStateValues ? null : state,
exception,
provider.ParseStateValues ? this.ParseState(state) : null);
var pool = provider.LogRecordPool;
var record = pool.Rent();
record.ScopeProvider = provider.IncludeScopes ? this.ScopeProvider : null;
record.State = provider.ParseStateValues ? null : state;
record.StateValues = provider.ParseStateValues ? ParseState(record, state) : null;
ref LogRecordData data = ref record.Data;
data.TimestampBacking = DateTime.UtcNow;
data.CategoryName = this.categoryName;
data.LogLevel = logLevel;
data.EventId = eventId;
data.Message = provider.IncludeFormattedMessage ? formatter?.Invoke(state, exception) : null;
data.Exception = exception;
LogRecordData.SetActivityContext(ref data, Activity.Current);
processor.OnEnd(record);
record.ScopeProvider = null;
// Attempt to return the LogRecord to the pool. This will no-op
// if a batch exporter has added a reference.
pool.Return(record);
}
}
@ -77,7 +90,7 @@ namespace OpenTelemetry.Logs
public IDisposable BeginScope<TState>(TState state) => this.ScopeProvider?.Push(state) ?? NullScope.Instance;
private IReadOnlyList<KeyValuePair<string, object?>> ParseState<TState>(TState state)
private static IReadOnlyList<KeyValuePair<string, object?>> ParseState<TState>(LogRecord logRecord, TState state)
{
if (state is IReadOnlyList<KeyValuePair<string, object?>> stateList)
{
@ -85,14 +98,22 @@ namespace OpenTelemetry.Logs
}
else if (state is IEnumerable<KeyValuePair<string, object?>> stateValues)
{
return new List<KeyValuePair<string, object?>>(stateValues);
var attributeStorage = logRecord.AttributeStorage;
if (attributeStorage == null)
{
return logRecord.AttributeStorage = new List<KeyValuePair<string, object?>>(stateValues);
}
else
{
attributeStorage.AddRange(stateValues);
return attributeStorage;
}
}
else
{
return new List<KeyValuePair<string, object?>>
{
new KeyValuePair<string, object?>(string.Empty, state),
};
var attributeStorage = logRecord.AttributeStorage ??= new List<KeyValuePair<string, object?>>(LogRecordPoolHelper.DefaultMaxNumberOfAttributes);
attributeStorage.Add(new KeyValuePair<string, object?>(string.Empty, state));
return attributeStorage;
}
}

View File

@ -38,6 +38,7 @@ namespace OpenTelemetry.Logs
internal BaseProcessor<LogRecord>? Processor;
internal Resource Resource;
private readonly Hashtable loggers = new();
private ILogRecordPool? threadStaticPool = LogRecordThreadStaticPool.Instance;
private bool disposed;
static OpenTelemetryLoggerProvider()
@ -52,7 +53,7 @@ namespace OpenTelemetry.Logs
/// </summary>
/// <param name="options"><see cref="OpenTelemetryLoggerOptions"/>.</param>
public OpenTelemetryLoggerProvider(IOptionsMonitor<OpenTelemetryLoggerOptions> options)
: this(options?.CurrentValue!)
: this(options?.CurrentValue ?? throw new ArgumentNullException(nameof(options)))
{
}
@ -91,6 +92,8 @@ namespace OpenTelemetry.Logs
internal IExternalScopeProvider? ScopeProvider { get; private set; }
internal ILogRecordPool LogRecordPool => this.threadStaticPool ?? LogRecordSharedPool.Current;
/// <inheritdoc/>
void ISupportExternalScope.SetScopeProvider(IExternalScopeProvider scopeProvider)
{
@ -160,6 +163,11 @@ namespace OpenTelemetry.Logs
processor.SetParentProvider(this);
if (this.threadStaticPool != null && this.ContainsBatchProcessor(processor))
{
this.threadStaticPool = null;
}
if (this.Processor == null)
{
this.Processor = processor;
@ -182,6 +190,29 @@ namespace OpenTelemetry.Logs
return this;
}
internal bool ContainsBatchProcessor(BaseProcessor<LogRecord> processor)
{
if (processor is BatchExportProcessor<LogRecord>)
{
return true;
}
else if (processor is CompositeProcessor<LogRecord> compositeProcessor)
{
var current = compositeProcessor.Head;
while (current != null)
{
if (this.ContainsBatchProcessor(current.Value))
{
return true;
}
current = current.Next;
}
}
return false;
}
/// <inheritdoc/>
protected override void Dispose(bool disposing)
{

View File

@ -0,0 +1,27 @@
// <copyright file="ILogRecordPool.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
#nullable enable
namespace OpenTelemetry.Logs
{
internal interface ILogRecordPool
{
LogRecord Rent();
void Return(LogRecord logRecord);
}
}

View File

@ -0,0 +1,61 @@
// <copyright file="LogRecordPoolHelper.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
#nullable enable
namespace OpenTelemetry.Logs
{
internal static class LogRecordPoolHelper
{
public const int DefaultMaxNumberOfAttributes = 64;
public const int DefaultMaxNumberOfScopes = 16;
public static void Clear(LogRecord logRecord)
{
var attributeStorage = logRecord.AttributeStorage;
if (attributeStorage != null)
{
if (attributeStorage.Count > DefaultMaxNumberOfAttributes)
{
// Don't allow the pool to grow unconstained.
logRecord.AttributeStorage = null;
}
else
{
/* List<T>.Clear sets the count/size to 0 but it maintains the
underlying array (capacity). */
attributeStorage.Clear();
}
}
var bufferedScopes = logRecord.BufferedScopes;
if (bufferedScopes != null)
{
if (bufferedScopes.Count > DefaultMaxNumberOfScopes)
{
// Don't allow the pool to grow unconstained.
logRecord.BufferedScopes = null;
}
else
{
/* List<T>.Clear sets the count/size to 0 but it maintains the
underlying array (capacity). */
bufferedScopes.Clear();
}
}
}
}
}

View File

@ -0,0 +1,150 @@
// <copyright file="LogRecordSharedPool.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
#nullable enable
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using OpenTelemetry.Internal;
namespace OpenTelemetry.Logs
{
internal sealed class LogRecordSharedPool : ILogRecordPool
{
public const int DefaultMaxPoolSize = 2048;
public static LogRecordSharedPool Current = new(DefaultMaxPoolSize);
public readonly int Capacity;
private readonly LogRecord?[] pool;
private long rentIndex;
private long returnIndex;
public LogRecordSharedPool(int capacity)
{
this.Capacity = capacity;
this.pool = new LogRecord?[capacity];
}
public int Count => (int)(Volatile.Read(ref this.returnIndex) - Volatile.Read(ref this.rentIndex));
// Note: It might make sense to expose this (somehow) in the future.
// Ideal config is shared pool capacity == max batch size.
public static void Resize(int capacity)
{
Guard.ThrowIfOutOfRange(capacity, min: 1);
Current = new(capacity);
}
public LogRecord Rent()
{
while (true)
{
var rentSnapshot = Volatile.Read(ref this.rentIndex);
var returnSnapshot = Volatile.Read(ref this.returnIndex);
if (rentSnapshot >= returnSnapshot)
{
break; // buffer is empty
}
if (Interlocked.CompareExchange(ref this.rentIndex, rentSnapshot + 1, rentSnapshot) == rentSnapshot)
{
var logRecord = Interlocked.Exchange(ref this.pool[rentSnapshot % this.Capacity], null);
if (logRecord == null && !this.TryRentCoreRare(rentSnapshot, out logRecord))
{
continue;
}
logRecord.ResetReferenceCount();
return logRecord;
}
}
var newLogRecord = new LogRecord();
newLogRecord.ResetReferenceCount();
return newLogRecord;
}
public void Return(LogRecord logRecord)
{
if (logRecord.RemoveReference() != 0)
{
return;
}
LogRecordPoolHelper.Clear(logRecord);
while (true)
{
var rentSnapshot = Volatile.Read(ref this.rentIndex);
var returnSnapshot = Volatile.Read(ref this.returnIndex);
if (returnSnapshot - rentSnapshot >= this.Capacity)
{
return; // buffer is full
}
if (Interlocked.CompareExchange(ref this.returnIndex, returnSnapshot + 1, returnSnapshot) == returnSnapshot)
{
// If many threads are hammering rent/return it is possible
// for two threads to write to the same index. In that case
// only one of the logRecords will make it back into the
// pool. Anything lost in the race will collected by the GC
// and the pool will issue new instances as needed. This
// could be abated by an Interlocked.CompareExchange here
// but for the general use case of an exporter returning
// records one-by-one, better to keep this fast and not pay
// for Interlocked.CompareExchange. The race is more
// theoretical.
this.pool[returnSnapshot % this.Capacity] = logRecord;
return;
}
}
}
private bool TryRentCoreRare(long rentSnapshot, [NotNullWhen(true)] out LogRecord? logRecord)
{
SpinWait wait = default;
while (true)
{
if (wait.NextSpinWillYield)
{
// Super rare case. If many threads are hammering
// rent/return it is possible a read was issued an index and
// then yielded while other threads caused the pointers to
// wrap around. When the yielded thread wakes up its read
// index could have been stolen by another thread. To
// prevent deadlock, bail out of read after spinning. This
// will cause either a successful rent from another index,
// or a new record to be created
logRecord = null;
return false;
}
wait.SpinOnce();
logRecord = Interlocked.Exchange(ref this.pool[rentSnapshot % this.Capacity], null);
if (logRecord != null)
{
// Rare case where the write was still working when the read came in
return true;
}
}
}
}
}

View File

@ -0,0 +1,55 @@
// <copyright file="LogRecordThreadStaticPool.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
#nullable enable
using System;
namespace OpenTelemetry.Logs
{
internal sealed class LogRecordThreadStaticPool : ILogRecordPool
{
[ThreadStatic]
public static LogRecord? Storage;
private LogRecordThreadStaticPool()
{
}
public static LogRecordThreadStaticPool Instance { get; } = new();
public LogRecord Rent()
{
var logRecord = Storage;
if (logRecord != null)
{
Storage = null;
return logRecord;
}
return new();
}
public void Return(LogRecord logRecord)
{
if (Storage == null)
{
LogRecordPoolHelper.Clear(logRecord);
Storage = logRecord;
}
}
}
}

View File

@ -56,6 +56,7 @@ namespace Benchmarks.Logs
new KeyValuePair<string, object>("item5", "value5"),
}));
#pragma warning disable CS0618 // Type or member is obsolete
this.logRecord = new LogRecord(
this.scopeProvider,
DateTime.UtcNow,
@ -66,6 +67,7 @@ namespace Benchmarks.Logs
null,
null,
null);
#pragma warning restore CS0618 // Type or member is obsolete
}
[Benchmark]

View File

@ -0,0 +1,107 @@
// <copyright file="BatchLogRecordExportProcessorTests.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
#if !NETFRAMEWORK
using System;
using System.Collections.Generic;
using Microsoft.Extensions.Logging;
using OpenTelemetry.Exporter;
using Xunit;
namespace OpenTelemetry.Logs.Tests
{
public sealed class BatchLogRecordExportProcessorTests
{
[Fact]
public void StateValuesAndScopeBufferingTest()
{
var scopeProvider = new LoggerExternalScopeProvider();
List<LogRecord> exportedItems = new();
using var exporter = new BatchLogRecordExportProcessor(
new InMemoryExporter<LogRecord>(exportedItems));
using var scope = scopeProvider.Push(exportedItems);
var logRecord = new LogRecord();
var state = new LogRecordTest.DisposingState("Hello world");
logRecord.ScopeProvider = scopeProvider;
logRecord.StateValues = state;
exporter.OnEnd(logRecord);
state.Dispose();
Assert.Empty(exportedItems);
Assert.Null(logRecord.ScopeProvider);
Assert.False(ReferenceEquals(state, logRecord.StateValues));
Assert.NotNull(logRecord.AttributeStorage);
Assert.NotNull(logRecord.BufferedScopes);
KeyValuePair<string, object> actualState = logRecord.StateValues[0];
Assert.Same("Value", actualState.Key);
Assert.Same("Hello world", actualState.Value);
bool foundScope = false;
logRecord.ForEachScope<object>(
(s, o) =>
{
foundScope = ReferenceEquals(s.Scope, exportedItems);
},
null);
Assert.True(foundScope);
}
[Fact]
public void StateBufferingTest()
{
// LogRecord.State is never inspected or buffered. Accessing it
// after OnEnd may throw. This test verifies that behavior. TODO:
// Investigate this. Potentially obsolete logRecord.State and force
// StateValues/ParseStateValues behavior.
List<LogRecord> exportedItems = new();
using var exporter = new BatchLogRecordExportProcessor(
new InMemoryExporter<LogRecord>(exportedItems));
var logRecord = new LogRecord();
var state = new LogRecordTest.DisposingState("Hello world");
logRecord.State = state;
exporter.OnEnd(logRecord);
state.Dispose();
Assert.Throws<ObjectDisposedException>(() =>
{
IReadOnlyList<KeyValuePair<string, object>> state = (IReadOnlyList<KeyValuePair<string, object>>)logRecord.State;
foreach (var kvp in state)
{
}
});
}
}
}
#endif

View File

@ -0,0 +1,278 @@
// <copyright file="LogRecordSharedPoolTests.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
#nullable enable
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Xunit;
namespace OpenTelemetry.Logs.Tests
{
public sealed class LogRecordSharedPoolTests
{
[Fact]
public void ResizeTests()
{
LogRecordSharedPool.Resize(LogRecordSharedPool.DefaultMaxPoolSize);
Assert.NotNull(LogRecordSharedPool.Current);
Assert.Equal(LogRecordSharedPool.DefaultMaxPoolSize, LogRecordSharedPool.Current.Capacity);
Assert.Throws<ArgumentOutOfRangeException>(() => LogRecordSharedPool.Resize(0));
var beforePool = LogRecordSharedPool.Current;
LogRecordSharedPool.Resize(1);
Assert.NotNull(LogRecordSharedPool.Current);
Assert.Equal(1, LogRecordSharedPool.Current.Capacity);
Assert.NotEqual(beforePool, LogRecordSharedPool.Current);
}
[Fact]
public void RentReturnTests()
{
LogRecordSharedPool.Resize(2);
var pool = LogRecordSharedPool.Current;
var logRecord1 = pool.Rent();
Assert.NotNull(logRecord1);
var logRecord2 = pool.Rent();
Assert.NotNull(logRecord1);
pool.Return(logRecord1);
Assert.Equal(1, pool.Count);
// Note: This is ignored because logRecord manually created has PoolReferenceCount = int.MaxValue.
LogRecord manualRecord = new();
Assert.Equal(int.MaxValue, manualRecord.PoolReferenceCount);
pool.Return(manualRecord);
Assert.Equal(1, pool.Count);
pool.Return(logRecord2);
Assert.Equal(2, pool.Count);
logRecord1 = pool.Rent();
Assert.NotNull(logRecord1);
Assert.Equal(1, pool.Count);
logRecord2 = pool.Rent();
Assert.NotNull(logRecord2);
Assert.Equal(0, pool.Count);
var logRecord3 = pool.Rent();
var logRecord4 = pool.Rent();
Assert.NotNull(logRecord3);
Assert.NotNull(logRecord4);
pool.Return(logRecord1);
pool.Return(logRecord2);
pool.Return(logRecord3);
pool.Return(logRecord4); // <- Discarded due to pool size of 2
Assert.Equal(2, pool.Count);
}
[Fact]
public void TrackReferenceTests()
{
LogRecordSharedPool.Resize(2);
var pool = LogRecordSharedPool.Current;
var logRecord1 = pool.Rent();
Assert.NotNull(logRecord1);
Assert.Equal(1, logRecord1.PoolReferenceCount);
logRecord1.AddReference();
Assert.Equal(2, logRecord1.PoolReferenceCount);
pool.Return(logRecord1);
Assert.Equal(1, logRecord1.PoolReferenceCount);
pool.Return(logRecord1);
Assert.Equal(1, pool.Count);
Assert.Equal(0, logRecord1.PoolReferenceCount);
pool.Return(logRecord1);
Assert.Equal(-1, logRecord1.PoolReferenceCount);
Assert.Equal(1, pool.Count); // Record was not returned because PoolReferences was negative.
}
[Fact]
public void ClearTests()
{
LogRecordSharedPool.Resize(LogRecordSharedPool.DefaultMaxPoolSize);
var pool = LogRecordSharedPool.Current;
var logRecord1 = pool.Rent();
logRecord1.AttributeStorage = new List<KeyValuePair<string, object?>>(16)
{
new KeyValuePair<string, object?>("key1", "value1"),
new KeyValuePair<string, object?>("key2", "value2"),
};
logRecord1.BufferedScopes = new List<object?>(8) { null, null };
pool.Return(logRecord1);
Assert.Empty(logRecord1.AttributeStorage);
Assert.Equal(16, logRecord1.AttributeStorage.Capacity);
Assert.Empty(logRecord1.BufferedScopes);
Assert.Equal(8, logRecord1.BufferedScopes.Capacity);
logRecord1 = pool.Rent();
Assert.NotNull(logRecord1.AttributeStorage);
Assert.NotNull(logRecord1.BufferedScopes);
for (int i = 0; i <= LogRecordPoolHelper.DefaultMaxNumberOfAttributes; i++)
{
logRecord1.AttributeStorage!.Add(new KeyValuePair<string, object?>("key", "value"));
}
for (int i = 0; i <= LogRecordPoolHelper.DefaultMaxNumberOfScopes; i++)
{
logRecord1.BufferedScopes!.Add(null);
}
pool.Return(logRecord1);
Assert.Null(logRecord1.AttributeStorage);
Assert.Null(logRecord1.BufferedScopes);
}
[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task ExportTest(bool warmup)
{
LogRecordSharedPool.Resize(LogRecordSharedPool.DefaultMaxPoolSize);
var pool = LogRecordSharedPool.Current;
if (warmup)
{
for (int i = 0; i < LogRecordSharedPool.DefaultMaxPoolSize; i++)
{
pool.Return(new LogRecord { PoolReferenceCount = 1 });
}
}
using BatchLogRecordExportProcessor processor = new(new NoopExporter());
List<Task> tasks = new();
for (int i = 0; i < Environment.ProcessorCount; i++)
{
tasks.Add(Task.Run(async () =>
{
Random random = new Random();
await Task.Delay(random.Next(100, 150)).ConfigureAwait(false);
for (int i = 0; i < 1000; i++)
{
var logRecord = pool.Rent();
processor.OnEnd(logRecord);
// This should no-op mostly.
pool.Return(logRecord);
await Task.Delay(random.Next(0, 20)).ConfigureAwait(false);
}
}));
}
await Task.WhenAll(tasks).ConfigureAwait(false);
processor.ForceFlush();
if (warmup)
{
Assert.Equal(LogRecordSharedPool.DefaultMaxPoolSize, pool.Count);
}
Assert.True(pool.Count <= LogRecordSharedPool.DefaultMaxPoolSize);
}
[Fact]
public async Task DeadlockTest()
{
/*
* The way the LogRecordPool works is it maintains two counters one
* for readers and one for writers. The counters always increment
* and point to an index in the pool array by way of a modulus on
* the size of the array (index = counter % capacity). Under very
* heavy load it is possible for a reader to receive an index and
* then be yielded. When waking up that index may no longer be valid
* if other threads caused the counters to loop around. There is
* protection for this case in the pool, this test verifies it is
* working.
*
* This is considered a corner case. Many threads have to be renting
* & returning logs in a tight loop for this to happen. Real
* applications should be logging based on logic firing which should
* have more natural back-off time.
*/
LogRecordSharedPool.Resize(LogRecordSharedPool.DefaultMaxPoolSize);
var pool = LogRecordSharedPool.Current;
List<Task> tasks = new();
for (int i = 0; i < Environment.ProcessorCount; i++)
{
tasks.Add(Task.Run(async () =>
{
await Task.Delay(2000).ConfigureAwait(false);
for (int i = 0; i < 100_000; i++)
{
var logRecord = pool.Rent();
pool.Return(logRecord);
}
}));
}
await Task.WhenAll(tasks).ConfigureAwait(false);
Assert.True(pool.Count <= LogRecordSharedPool.DefaultMaxPoolSize);
}
private sealed class NoopExporter : BaseExporter<LogRecord>
{
public override ExportResult Export(in Batch<LogRecord> batch)
{
return ExportResult.Success;
}
}
}
}

View File

@ -256,8 +256,7 @@ namespace OpenTelemetry.Logs.Tests
using var loggerFactory = InitializeLoggerFactory(out List<LogRecord> exportedItems, configure: null);
var logger = loggerFactory.CreateLogger<LogRecordTest>();
var message = $"This does not matter.";
logger.LogInformation(message);
logger.LogInformation("This does not matter.");
var logRecord = exportedItems[0];
logRecord.State = "newState";
@ -744,6 +743,34 @@ namespace OpenTelemetry.Logs.Tests
Assert.Same(state, actualState.Value);
}
[Fact]
public void DisposingStateTest()
{
using var loggerFactory = InitializeLoggerFactory(out List<LogRecord> exportedItems, configure: options => options.ParseStateValues = true);
var logger = loggerFactory.CreateLogger<LogRecordTest>();
DisposingState state = new DisposingState("Hello world");
logger.Log(
LogLevel.Information,
0,
state,
null,
(s, e) => "OpenTelemetry!");
var logRecord = exportedItems[0];
state.Dispose();
Assert.Null(logRecord.State);
Assert.NotNull(logRecord.StateValues);
Assert.Equal(1, logRecord.StateValues.Count);
KeyValuePair<string, object> actualState = logRecord.StateValues[0];
Assert.Same("Value", actualState.Key);
Assert.Same("Hello world", actualState.Value);
}
private static ILoggerFactory InitializeLoggerFactory(out List<LogRecord> exportedItems, Action<OpenTelemetryLoggerOptions> configure = null)
{
var items = exportedItems = new List<LogRecord>();
@ -790,6 +817,54 @@ namespace OpenTelemetry.Logs.Tests
}
}
internal sealed class DisposingState : IReadOnlyList<KeyValuePair<string, object>>, IDisposable
{
private string value;
private bool disposed;
public DisposingState(string value)
{
this.Value = value;
}
public int Count => 1;
public string Value
{
get
{
if (this.disposed)
{
throw new ObjectDisposedException(nameof(DisposingState));
}
return this.value;
}
private set => this.value = value;
}
public KeyValuePair<string, object> this[int index] => index switch
{
0 => new KeyValuePair<string, object>(nameof(this.Value), this.Value),
_ => throw new IndexOutOfRangeException(nameof(index)),
};
public void Dispose()
{
this.disposed = true;
}
public IEnumerator<KeyValuePair<string, object>> GetEnumerator()
{
for (var i = 0; i < this.Count; i++)
{
yield return this[i];
}
}
IEnumerator IEnumerable.GetEnumerator() => this.GetEnumerator();
}
private class RedactionProcessor : BaseProcessor<LogRecord>
{
private readonly Field fieldToUpdate;

View File

@ -0,0 +1,90 @@
// <copyright file="LogRecordThreadStaticPoolTests.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
#nullable enable
using System.Collections.Generic;
using Xunit;
namespace OpenTelemetry.Logs.Tests
{
public sealed class LogRecordThreadStaticPoolTests
{
[Fact]
public void RentReturnTests()
{
LogRecordThreadStaticPool.Storage = null;
var logRecord = LogRecordThreadStaticPool.Instance.Rent();
Assert.NotNull(logRecord);
Assert.Null(LogRecordThreadStaticPool.Storage);
LogRecordThreadStaticPool.Instance.Return(logRecord);
Assert.NotNull(LogRecordThreadStaticPool.Storage);
Assert.Equal(logRecord, LogRecordThreadStaticPool.Storage);
LogRecordThreadStaticPool.Instance.Return(new());
Assert.NotNull(LogRecordThreadStaticPool.Storage);
Assert.Equal(logRecord, LogRecordThreadStaticPool.Storage);
LogRecordThreadStaticPool.Storage = null;
var manual = new LogRecord();
LogRecordThreadStaticPool.Instance.Return(manual);
Assert.NotNull(LogRecordThreadStaticPool.Storage);
Assert.Equal(manual, LogRecordThreadStaticPool.Storage);
}
[Fact]
public void ClearTests()
{
var logRecord1 = LogRecordThreadStaticPool.Instance.Rent();
logRecord1.AttributeStorage = new List<KeyValuePair<string, object?>>(16)
{
new KeyValuePair<string, object?>("key1", "value1"),
new KeyValuePair<string, object?>("key2", "value2"),
};
logRecord1.BufferedScopes = new List<object?>(8) { null, null };
LogRecordThreadStaticPool.Instance.Return(logRecord1);
Assert.Empty(logRecord1.AttributeStorage);
Assert.Equal(16, logRecord1.AttributeStorage.Capacity);
Assert.Empty(logRecord1.BufferedScopes);
Assert.Equal(8, logRecord1.BufferedScopes.Capacity);
logRecord1 = LogRecordThreadStaticPool.Instance.Rent();
Assert.NotNull(logRecord1.AttributeStorage);
Assert.NotNull(logRecord1.BufferedScopes);
for (int i = 0; i <= LogRecordPoolHelper.DefaultMaxNumberOfAttributes; i++)
{
logRecord1.AttributeStorage!.Add(new KeyValuePair<string, object?>("key", "value"));
}
for (int i = 0; i <= LogRecordPoolHelper.DefaultMaxNumberOfScopes; i++)
{
logRecord1.BufferedScopes!.Add(null);
}
LogRecordThreadStaticPool.Instance.Return(logRecord1);
Assert.Null(logRecord1.AttributeStorage);
Assert.Null(logRecord1.BufferedScopes);
}
}
}

View File

@ -85,5 +85,65 @@ namespace OpenTelemetry.Logs.Tests
Assert.Single(exportedItems);
}
[Fact]
public void ThreadStaticPoolUsedByProviderTests()
{
using var provider1 = new OpenTelemetryLoggerProvider(new OpenTelemetryLoggerOptions());
Assert.Equal(LogRecordThreadStaticPool.Instance, provider1.LogRecordPool);
var options = new OpenTelemetryLoggerOptions();
options.AddProcessor(new SimpleLogRecordExportProcessor(new NoopExporter()));
using var provider2 = new OpenTelemetryLoggerProvider(options);
Assert.Equal(LogRecordThreadStaticPool.Instance, provider2.LogRecordPool);
options.AddProcessor(new SimpleLogRecordExportProcessor(new NoopExporter()));
using var provider3 = new OpenTelemetryLoggerProvider(options);
Assert.Equal(LogRecordThreadStaticPool.Instance, provider3.LogRecordPool);
}
[Fact]
public void SharedPoolUsedByProviderTests()
{
var options = new OpenTelemetryLoggerOptions();
options.AddProcessor(new BatchLogRecordExportProcessor(new NoopExporter()));
using var provider1 = new OpenTelemetryLoggerProvider(options);
Assert.Equal(LogRecordSharedPool.Current, provider1.LogRecordPool);
options = new OpenTelemetryLoggerOptions();
options.AddProcessor(new SimpleLogRecordExportProcessor(new NoopExporter()));
options.AddProcessor(new BatchLogRecordExportProcessor(new NoopExporter()));
using var provider2 = new OpenTelemetryLoggerProvider(options);
Assert.Equal(LogRecordSharedPool.Current, provider2.LogRecordPool);
options = new OpenTelemetryLoggerOptions();
options.AddProcessor(new SimpleLogRecordExportProcessor(new NoopExporter()));
options.AddProcessor(new CompositeProcessor<LogRecord>(new BaseProcessor<LogRecord>[]
{
new SimpleLogRecordExportProcessor(new NoopExporter()),
new BatchLogRecordExportProcessor(new NoopExporter()),
}));
using var provider3 = new OpenTelemetryLoggerProvider(options);
Assert.Equal(LogRecordSharedPool.Current, provider3.LogRecordPool);
}
private sealed class NoopExporter : BaseExporter<LogRecord>
{
public override ExportResult Export(in Batch<LogRecord> batch)
{
return ExportResult.Success;
}
}
}
}