Metrics Multithreaded Tests - Histogram (#2367)

This commit is contained in:
Michael Maxwell 2022-01-10 11:33:57 -08:00 committed by GitHub
parent 8c1c1495d9
commit 1ff1155852
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 163 additions and 105 deletions

View File

@ -94,8 +94,7 @@ public partial class Program
.Build() : null;
var statistics = new long[concurrency];
var watchForTotal = new Stopwatch();
watchForTotal.Start();
var watchForTotal = Stopwatch.StartNew();
Parallel.Invoke(
() =>

View File

@ -510,118 +510,43 @@ namespace OpenTelemetry.Metrics.Tests
[Fact]
public void MultithreadedLongCounterTest()
{
var exportedItems = new List<Metric>();
using var meter = new Meter(Utils.GetCurrentMethodName());
var counterLong = meter.CreateCounter<long>("mycounter");
using var meterProvider = Sdk.CreateMeterProviderBuilder()
.AddMeter(meter.Name)
.AddReader(new BaseExportingMetricReader(new InMemoryExporter<Metric>(exportedItems))
{
Temporality = AggregationTemporality.Cumulative,
})
.Build();
// setup args to threads.
var mreToBlockUpdateThreads = new ManualResetEvent(false);
var mreToEnsureAllThreadsStarted = new ManualResetEvent(false);
var argToThread = new UpdateThreadArguments<long>();
argToThread.DeltaValueUpdatedByEachCall = deltaLongValueUpdatedByEachCall;
argToThread.Counter = counterLong;
argToThread.ThreadsStartedCount = 0;
argToThread.MreToBlockUpdateThread = mreToBlockUpdateThreads;
argToThread.MreToEnsureAllThreadsStart = mreToEnsureAllThreadsStarted;
Thread[] t = new Thread[numberOfThreads];
for (int i = 0; i < numberOfThreads; i++)
{
t[i] = new Thread(CounterUpdateThread<long>);
t[i].Start(argToThread);
}
// Block until all threads started.
mreToEnsureAllThreadsStarted.WaitOne();
Stopwatch sw = Stopwatch.StartNew();
// unblock all the threads.
// (i.e let them start counter.Add)
mreToBlockUpdateThreads.Set();
for (int i = 0; i < numberOfThreads; i++)
{
// wait for all threads to complete
t[i].Join();
}
var timeTakenInMilliseconds = sw.ElapsedMilliseconds;
this.output.WriteLine($"Took {timeTakenInMilliseconds} msecs. Total threads: {numberOfThreads}, each thread doing {numberOfMetricUpdateByEachThread} recordings.");
meterProvider.ForceFlush(MaxTimeToAllowForFlush);
var sumReceived = GetLongSum(exportedItems);
var expectedSum = deltaLongValueUpdatedByEachCall * numberOfMetricUpdateByEachThread * numberOfThreads;
Assert.Equal(expectedSum, sumReceived);
this.MultithreadedCounterTest(deltaLongValueUpdatedByEachCall);
}
[Fact]
public void MultithreadedDoubleCounterTest()
{
var exportedItems = new List<Metric>();
using var meter = new Meter(Utils.GetCurrentMethodName());
var counterDouble = meter.CreateCounter<double>("mycounter");
using var meterProvider = Sdk.CreateMeterProviderBuilder()
.AddMeter(meter.Name)
.AddReader(new BaseExportingMetricReader(new InMemoryExporter<Metric>(exportedItems))
{
Temporality = AggregationTemporality.Cumulative,
})
.Build();
// setup args to threads.
var mreToBlockUpdateThreads = new ManualResetEvent(false);
var mreToEnsureAllThreadsStarted = new ManualResetEvent(false);
var argToThread = new UpdateThreadArguments<double>();
argToThread.DeltaValueUpdatedByEachCall = deltaDoubleValueUpdatedByEachCall;
argToThread.Counter = counterDouble;
argToThread.ThreadsStartedCount = 0;
argToThread.MreToBlockUpdateThread = mreToBlockUpdateThreads;
argToThread.MreToEnsureAllThreadsStart = mreToEnsureAllThreadsStarted;
Thread[] t = new Thread[numberOfThreads];
for (int i = 0; i < numberOfThreads; i++)
{
t[i] = new Thread(CounterUpdateThread<double>);
t[i].Start(argToThread);
this.MultithreadedCounterTest(deltaDoubleValueUpdatedByEachCall);
}
// Block until all threads started.
mreToEnsureAllThreadsStarted.WaitOne();
Stopwatch sw = Stopwatch.StartNew();
// unblock all the threads.
// (i.e let them start counter.Add)
mreToBlockUpdateThreads.Set();
for (int i = 0; i < numberOfThreads; i++)
[Fact]
public void MultithreadedLongHistogramTest()
{
// wait for all threads to complete
t[i].Join();
var expected = new long[11];
for (var i = 0; i < expected.Length; i++)
{
expected[i] = numberOfThreads * numberOfMetricUpdateByEachThread;
}
var timeTakenInMilliseconds = sw.ElapsedMilliseconds;
this.output.WriteLine($"Took {timeTakenInMilliseconds} msecs. Total threads: {numberOfThreads}, each thread doing {numberOfMetricUpdateByEachThread} recordings.");
// Metric.DefaultHistogramBounds: 0, 5, 10, 25, 50, 75, 100, 250, 500, 1000
var values = new long[] { -1, 1, 6, 20, 40, 60, 80, 200, 300, 600, 1001 };
meterProvider.ForceFlush(MaxTimeToAllowForFlush);
this.MultithreadedHistogramTest(expected, values);
}
var sumReceived = GetDoubleSum(exportedItems);
var expectedSum = deltaDoubleValueUpdatedByEachCall * numberOfMetricUpdateByEachThread * numberOfThreads;
var difference = Math.Abs(sumReceived - expectedSum);
Assert.True(difference <= 0.0001);
[Fact]
public void MultithreadedDoubleHistogramTest()
{
var expected = new long[11];
for (var i = 0; i < expected.Length; i++)
{
expected[i] = numberOfThreads * numberOfMetricUpdateByEachThread;
}
// Metric.DefaultHistogramBounds: 0, 5, 10, 25, 50, 75, 100, 250, 500, 1000
var values = new double[] { -1.0, 1.0, 6.0, 20.0, 40.0, 60.0, 80.0, 200.0, 300.0, 600.0, 1001.0 };
this.MultithreadedHistogramTest(expected, values);
}
[Theory]
@ -736,16 +661,15 @@ namespace OpenTelemetry.Metrics.Tests
private static void CounterUpdateThread<T>(object obj)
where T : struct, IComparable
{
var arguments = obj as UpdateThreadArguments<T>;
if (arguments == null)
if (obj is not UpdateThreadArguments<T> arguments)
{
throw new Exception("Invalid args");
}
var mre = arguments.MreToBlockUpdateThread;
var mreToEnsureAllThreadsStart = arguments.MreToEnsureAllThreadsStart;
var counter = arguments.Counter;
var valueToUpdate = arguments.DeltaValueUpdatedByEachCall;
var counter = arguments.Instrument as Counter<T>;
var valueToUpdate = arguments.ValuesToRecord[0];
if (Interlocked.Increment(ref arguments.ThreadsStartedCount) == numberOfThreads)
{
mreToEnsureAllThreadsStart.Set();
@ -760,14 +684,149 @@ namespace OpenTelemetry.Metrics.Tests
}
}
private static void HistogramUpdateThread<T>(object obj)
where T : struct, IComparable
{
if (obj is not UpdateThreadArguments<T> arguments)
{
throw new Exception("Invalid args");
}
var mre = arguments.MreToBlockUpdateThread;
var mreToEnsureAllThreadsStart = arguments.MreToEnsureAllThreadsStart;
var histogram = arguments.Instrument as Histogram<T>;
if (Interlocked.Increment(ref arguments.ThreadsStartedCount) == numberOfThreads)
{
mreToEnsureAllThreadsStart.Set();
}
// Wait until signalled to start calling update on aggregator
mre.WaitOne();
for (int i = 0; i < numberOfMetricUpdateByEachThread; i++)
{
for (int j = 0; j < arguments.ValuesToRecord.Length; j++)
{
histogram.Record(arguments.ValuesToRecord[j]);
}
}
}
private void MultithreadedCounterTest<T>(T deltaValueUpdatedByEachCall)
where T : struct, IComparable
{
var metricItems = new List<Metric>();
var metricReader = new BaseExportingMetricReader(new InMemoryExporter<Metric>(metricItems));
using var meter = new Meter($"{Utils.GetCurrentMethodName()}.{typeof(T).Name}.{deltaValueUpdatedByEachCall}");
using var meterProvider = Sdk.CreateMeterProviderBuilder()
.AddMeter(meter.Name)
.AddReader(metricReader)
.Build();
var argToThread = new UpdateThreadArguments<T>
{
ValuesToRecord = new T[] { deltaValueUpdatedByEachCall },
Instrument = meter.CreateCounter<T>("counter"),
MreToBlockUpdateThread = new ManualResetEvent(false),
MreToEnsureAllThreadsStart = new ManualResetEvent(false),
};
Thread[] t = new Thread[numberOfThreads];
for (int i = 0; i < numberOfThreads; i++)
{
t[i] = new Thread(CounterUpdateThread<T>);
t[i].Start(argToThread);
}
argToThread.MreToEnsureAllThreadsStart.WaitOne();
Stopwatch sw = Stopwatch.StartNew();
argToThread.MreToBlockUpdateThread.Set();
for (int i = 0; i < numberOfThreads; i++)
{
t[i].Join();
}
this.output.WriteLine($"Took {sw.ElapsedMilliseconds} msecs. Total threads: {numberOfThreads}, each thread doing {numberOfMetricUpdateByEachThread} recordings.");
metricReader.Collect();
if (typeof(T) == typeof(long))
{
var sumReceived = GetLongSum(metricItems);
var expectedSum = deltaLongValueUpdatedByEachCall * numberOfMetricUpdateByEachThread * numberOfThreads;
Assert.Equal(expectedSum, sumReceived);
}
else if (typeof(T) == typeof(double))
{
var sumReceived = GetDoubleSum(metricItems);
var expectedSum = deltaDoubleValueUpdatedByEachCall * numberOfMetricUpdateByEachThread * numberOfThreads;
Assert.Equal(expectedSum, sumReceived, 2);
}
}
private void MultithreadedHistogramTest<T>(long[] expected, T[] values)
where T : struct, IComparable
{
var bucketCounts = new long[11];
var metricReader = new BaseExportingMetricReader(new TestExporter<Metric>(batch =>
{
foreach (var metric in batch)
{
foreach (var metricPoint in metric.GetMetricPoints())
{
bucketCounts = metricPoint.GetHistogramBuckets().BucketCounts;
}
}
}));
using var meter = new Meter($"{Utils.GetCurrentMethodName()}.{typeof(T).Name}");
using var meterProvider = Sdk.CreateMeterProviderBuilder()
.AddMeter(meter.Name)
.AddReader(metricReader)
.Build();
var argsToThread = new UpdateThreadArguments<T>
{
Instrument = meter.CreateHistogram<T>("histogram"),
MreToBlockUpdateThread = new ManualResetEvent(false),
MreToEnsureAllThreadsStart = new ManualResetEvent(false),
ValuesToRecord = values,
};
Thread[] t = new Thread[numberOfThreads];
for (int i = 0; i < numberOfThreads; i++)
{
t[i] = new Thread(HistogramUpdateThread<T>);
t[i].Start(argsToThread);
}
argsToThread.MreToEnsureAllThreadsStart.WaitOne();
Stopwatch sw = Stopwatch.StartNew();
argsToThread.MreToBlockUpdateThread.Set();
for (int i = 0; i < numberOfThreads; i++)
{
t[i].Join();
}
this.output.WriteLine($"Took {sw.ElapsedMilliseconds} msecs. Total threads: {numberOfThreads}, each thread doing {numberOfMetricUpdateByEachThread * values.Length} recordings.");
metricReader.Collect();
Assert.Equal(expected, bucketCounts);
}
private class UpdateThreadArguments<T>
where T : struct, IComparable
{
public ManualResetEvent MreToBlockUpdateThread;
public ManualResetEvent MreToEnsureAllThreadsStart;
public int ThreadsStartedCount;
public Counter<T> Counter;
public T DeltaValueUpdatedByEachCall;
public Instrument<T> Instrument;
public T[] ValuesToRecord;
}
}
}