diff --git a/test/OpenTelemetry.Tests.Stress/Skeleton.cs b/test/OpenTelemetry.Tests.Stress/Skeleton.cs index 8381eddbe..7acd5b9d7 100644 --- a/test/OpenTelemetry.Tests.Stress/Skeleton.cs +++ b/test/OpenTelemetry.Tests.Stress/Skeleton.cs @@ -94,8 +94,7 @@ public partial class Program .Build() : null; var statistics = new long[concurrency]; - var watchForTotal = new Stopwatch(); - watchForTotal.Start(); + var watchForTotal = Stopwatch.StartNew(); Parallel.Invoke( () => diff --git a/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs b/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs index edf65d9d7..d89b748d5 100644 --- a/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs +++ b/test/OpenTelemetry.Tests/Metrics/MetricAPITest.cs @@ -510,118 +510,43 @@ namespace OpenTelemetry.Metrics.Tests [Fact] public void MultithreadedLongCounterTest() { - var exportedItems = new List(); - - using var meter = new Meter(Utils.GetCurrentMethodName()); - var counterLong = meter.CreateCounter("mycounter"); - using var meterProvider = Sdk.CreateMeterProviderBuilder() - .AddMeter(meter.Name) - .AddReader(new BaseExportingMetricReader(new InMemoryExporter(exportedItems)) - { - Temporality = AggregationTemporality.Cumulative, - }) - .Build(); - - // setup args to threads. - var mreToBlockUpdateThreads = new ManualResetEvent(false); - var mreToEnsureAllThreadsStarted = new ManualResetEvent(false); - - var argToThread = new UpdateThreadArguments(); - argToThread.DeltaValueUpdatedByEachCall = deltaLongValueUpdatedByEachCall; - argToThread.Counter = counterLong; - argToThread.ThreadsStartedCount = 0; - argToThread.MreToBlockUpdateThread = mreToBlockUpdateThreads; - argToThread.MreToEnsureAllThreadsStart = mreToEnsureAllThreadsStarted; - - Thread[] t = new Thread[numberOfThreads]; - for (int i = 0; i < numberOfThreads; i++) - { - t[i] = new Thread(CounterUpdateThread); - t[i].Start(argToThread); - } - - // Block until all threads started. - mreToEnsureAllThreadsStarted.WaitOne(); - - Stopwatch sw = Stopwatch.StartNew(); - - // unblock all the threads. - // (i.e let them start counter.Add) - mreToBlockUpdateThreads.Set(); - - for (int i = 0; i < numberOfThreads; i++) - { - // wait for all threads to complete - t[i].Join(); - } - - var timeTakenInMilliseconds = sw.ElapsedMilliseconds; - this.output.WriteLine($"Took {timeTakenInMilliseconds} msecs. Total threads: {numberOfThreads}, each thread doing {numberOfMetricUpdateByEachThread} recordings."); - - meterProvider.ForceFlush(MaxTimeToAllowForFlush); - - var sumReceived = GetLongSum(exportedItems); - var expectedSum = deltaLongValueUpdatedByEachCall * numberOfMetricUpdateByEachThread * numberOfThreads; - Assert.Equal(expectedSum, sumReceived); + this.MultithreadedCounterTest(deltaLongValueUpdatedByEachCall); } [Fact] public void MultithreadedDoubleCounterTest() { - var exportedItems = new List(); + this.MultithreadedCounterTest(deltaDoubleValueUpdatedByEachCall); + } - using var meter = new Meter(Utils.GetCurrentMethodName()); - var counterDouble = meter.CreateCounter("mycounter"); - using var meterProvider = Sdk.CreateMeterProviderBuilder() - .AddMeter(meter.Name) - .AddReader(new BaseExportingMetricReader(new InMemoryExporter(exportedItems)) - { - Temporality = AggregationTemporality.Cumulative, - }) - .Build(); - - // setup args to threads. - var mreToBlockUpdateThreads = new ManualResetEvent(false); - var mreToEnsureAllThreadsStarted = new ManualResetEvent(false); - - var argToThread = new UpdateThreadArguments(); - argToThread.DeltaValueUpdatedByEachCall = deltaDoubleValueUpdatedByEachCall; - argToThread.Counter = counterDouble; - argToThread.ThreadsStartedCount = 0; - argToThread.MreToBlockUpdateThread = mreToBlockUpdateThreads; - argToThread.MreToEnsureAllThreadsStart = mreToEnsureAllThreadsStarted; - - Thread[] t = new Thread[numberOfThreads]; - for (int i = 0; i < numberOfThreads; i++) + [Fact] + public void MultithreadedLongHistogramTest() + { + var expected = new long[11]; + for (var i = 0; i < expected.Length; i++) { - t[i] = new Thread(CounterUpdateThread); - t[i].Start(argToThread); + expected[i] = numberOfThreads * numberOfMetricUpdateByEachThread; } - // Block until all threads started. - mreToEnsureAllThreadsStarted.WaitOne(); + // Metric.DefaultHistogramBounds: 0, 5, 10, 25, 50, 75, 100, 250, 500, 1000 + var values = new long[] { -1, 1, 6, 20, 40, 60, 80, 200, 300, 600, 1001 }; - Stopwatch sw = Stopwatch.StartNew(); + this.MultithreadedHistogramTest(expected, values); + } - // unblock all the threads. - // (i.e let them start counter.Add) - mreToBlockUpdateThreads.Set(); - - for (int i = 0; i < numberOfThreads; i++) + [Fact] + public void MultithreadedDoubleHistogramTest() + { + var expected = new long[11]; + for (var i = 0; i < expected.Length; i++) { - // wait for all threads to complete - t[i].Join(); + expected[i] = numberOfThreads * numberOfMetricUpdateByEachThread; } - var timeTakenInMilliseconds = sw.ElapsedMilliseconds; - this.output.WriteLine($"Took {timeTakenInMilliseconds} msecs. Total threads: {numberOfThreads}, each thread doing {numberOfMetricUpdateByEachThread} recordings."); + // Metric.DefaultHistogramBounds: 0, 5, 10, 25, 50, 75, 100, 250, 500, 1000 + var values = new double[] { -1.0, 1.0, 6.0, 20.0, 40.0, 60.0, 80.0, 200.0, 300.0, 600.0, 1001.0 }; - meterProvider.ForceFlush(MaxTimeToAllowForFlush); - - var sumReceived = GetDoubleSum(exportedItems); - var expectedSum = deltaDoubleValueUpdatedByEachCall * numberOfMetricUpdateByEachThread * numberOfThreads; - var difference = Math.Abs(sumReceived - expectedSum); - Assert.True(difference <= 0.0001); + this.MultithreadedHistogramTest(expected, values); } [Theory] @@ -736,16 +661,15 @@ namespace OpenTelemetry.Metrics.Tests private static void CounterUpdateThread(object obj) where T : struct, IComparable { - var arguments = obj as UpdateThreadArguments; - if (arguments == null) + if (obj is not UpdateThreadArguments arguments) { throw new Exception("Invalid args"); } var mre = arguments.MreToBlockUpdateThread; var mreToEnsureAllThreadsStart = arguments.MreToEnsureAllThreadsStart; - var counter = arguments.Counter; - var valueToUpdate = arguments.DeltaValueUpdatedByEachCall; + var counter = arguments.Instrument as Counter; + var valueToUpdate = arguments.ValuesToRecord[0]; if (Interlocked.Increment(ref arguments.ThreadsStartedCount) == numberOfThreads) { mreToEnsureAllThreadsStart.Set(); @@ -760,14 +684,149 @@ namespace OpenTelemetry.Metrics.Tests } } + private static void HistogramUpdateThread(object obj) + where T : struct, IComparable + { + if (obj is not UpdateThreadArguments arguments) + { + throw new Exception("Invalid args"); + } + + var mre = arguments.MreToBlockUpdateThread; + var mreToEnsureAllThreadsStart = arguments.MreToEnsureAllThreadsStart; + var histogram = arguments.Instrument as Histogram; + + if (Interlocked.Increment(ref arguments.ThreadsStartedCount) == numberOfThreads) + { + mreToEnsureAllThreadsStart.Set(); + } + + // Wait until signalled to start calling update on aggregator + mre.WaitOne(); + + for (int i = 0; i < numberOfMetricUpdateByEachThread; i++) + { + for (int j = 0; j < arguments.ValuesToRecord.Length; j++) + { + histogram.Record(arguments.ValuesToRecord[j]); + } + } + } + + private void MultithreadedCounterTest(T deltaValueUpdatedByEachCall) + where T : struct, IComparable + { + var metricItems = new List(); + var metricReader = new BaseExportingMetricReader(new InMemoryExporter(metricItems)); + + using var meter = new Meter($"{Utils.GetCurrentMethodName()}.{typeof(T).Name}.{deltaValueUpdatedByEachCall}"); + using var meterProvider = Sdk.CreateMeterProviderBuilder() + .AddMeter(meter.Name) + .AddReader(metricReader) + .Build(); + + var argToThread = new UpdateThreadArguments + { + ValuesToRecord = new T[] { deltaValueUpdatedByEachCall }, + Instrument = meter.CreateCounter("counter"), + MreToBlockUpdateThread = new ManualResetEvent(false), + MreToEnsureAllThreadsStart = new ManualResetEvent(false), + }; + + Thread[] t = new Thread[numberOfThreads]; + for (int i = 0; i < numberOfThreads; i++) + { + t[i] = new Thread(CounterUpdateThread); + t[i].Start(argToThread); + } + + argToThread.MreToEnsureAllThreadsStart.WaitOne(); + Stopwatch sw = Stopwatch.StartNew(); + argToThread.MreToBlockUpdateThread.Set(); + + for (int i = 0; i < numberOfThreads; i++) + { + t[i].Join(); + } + + this.output.WriteLine($"Took {sw.ElapsedMilliseconds} msecs. Total threads: {numberOfThreads}, each thread doing {numberOfMetricUpdateByEachThread} recordings."); + + metricReader.Collect(); + + if (typeof(T) == typeof(long)) + { + var sumReceived = GetLongSum(metricItems); + var expectedSum = deltaLongValueUpdatedByEachCall * numberOfMetricUpdateByEachThread * numberOfThreads; + Assert.Equal(expectedSum, sumReceived); + } + else if (typeof(T) == typeof(double)) + { + var sumReceived = GetDoubleSum(metricItems); + var expectedSum = deltaDoubleValueUpdatedByEachCall * numberOfMetricUpdateByEachThread * numberOfThreads; + Assert.Equal(expectedSum, sumReceived, 2); + } + } + + private void MultithreadedHistogramTest(long[] expected, T[] values) + where T : struct, IComparable + { + var bucketCounts = new long[11]; + var metricReader = new BaseExportingMetricReader(new TestExporter(batch => + { + foreach (var metric in batch) + { + foreach (var metricPoint in metric.GetMetricPoints()) + { + bucketCounts = metricPoint.GetHistogramBuckets().BucketCounts; + } + } + })); + + using var meter = new Meter($"{Utils.GetCurrentMethodName()}.{typeof(T).Name}"); + using var meterProvider = Sdk.CreateMeterProviderBuilder() + .AddMeter(meter.Name) + .AddReader(metricReader) + .Build(); + + var argsToThread = new UpdateThreadArguments + { + Instrument = meter.CreateHistogram("histogram"), + MreToBlockUpdateThread = new ManualResetEvent(false), + MreToEnsureAllThreadsStart = new ManualResetEvent(false), + ValuesToRecord = values, + }; + + Thread[] t = new Thread[numberOfThreads]; + for (int i = 0; i < numberOfThreads; i++) + { + t[i] = new Thread(HistogramUpdateThread); + t[i].Start(argsToThread); + } + + argsToThread.MreToEnsureAllThreadsStart.WaitOne(); + Stopwatch sw = Stopwatch.StartNew(); + argsToThread.MreToBlockUpdateThread.Set(); + + for (int i = 0; i < numberOfThreads; i++) + { + t[i].Join(); + } + + this.output.WriteLine($"Took {sw.ElapsedMilliseconds} msecs. Total threads: {numberOfThreads}, each thread doing {numberOfMetricUpdateByEachThread * values.Length} recordings."); + + metricReader.Collect(); + + Assert.Equal(expected, bucketCounts); + } + private class UpdateThreadArguments where T : struct, IComparable { public ManualResetEvent MreToBlockUpdateThread; public ManualResetEvent MreToEnsureAllThreadsStart; public int ThreadsStartedCount; - public Counter Counter; - public T DeltaValueUpdatedByEachCall; + public Instrument Instrument; + public T[] ValuesToRecord; } } }