172 lines
4.9 KiB
C++
172 lines
4.9 KiB
C++
// Copyright The OpenTelemetry Authors
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
#include <gtest/gtest.h>
|
|
#include <stddef.h>
|
|
#include <algorithm>
|
|
#include <atomic>
|
|
#include <cassert>
|
|
#include <cstdint>
|
|
#include <functional>
|
|
#include <initializer_list>
|
|
#include <memory>
|
|
#include <random>
|
|
#include <string>
|
|
#include <thread>
|
|
#include <vector>
|
|
|
|
#include "opentelemetry/sdk/common/atomic_unique_ptr.h"
|
|
#include "opentelemetry/sdk/common/circular_buffer.h"
|
|
#include "opentelemetry/sdk/common/circular_buffer_range.h"
|
|
|
|
using opentelemetry::sdk::common::AtomicUniquePtr;
|
|
using opentelemetry::sdk::common::CircularBuffer;
|
|
using opentelemetry::sdk::common::CircularBufferRange;
|
|
|
|
static thread_local std::mt19937 RandomNumberGenerator{std::random_device{}()};
|
|
|
|
static void GenerateRandomNumbers(CircularBuffer<uint32_t> &buffer,
|
|
std::vector<uint32_t> &numbers,
|
|
int n)
|
|
{
|
|
for (int i = 0; i < n; ++i)
|
|
{
|
|
auto value = static_cast<uint32_t>(RandomNumberGenerator());
|
|
std::unique_ptr<uint32_t> x{new uint32_t{value}};
|
|
if (buffer.Add(x))
|
|
{
|
|
numbers.push_back(value);
|
|
}
|
|
}
|
|
}
|
|
|
|
static void RunNumberProducers(CircularBuffer<uint32_t> &buffer,
|
|
std::vector<uint32_t> &numbers,
|
|
int num_threads,
|
|
int n)
|
|
{
|
|
std::vector<std::vector<uint32_t>> thread_numbers(num_threads);
|
|
std::vector<std::thread> threads(num_threads);
|
|
for (int thread_index = 0; thread_index < num_threads; ++thread_index)
|
|
{
|
|
threads[thread_index] = std::thread{GenerateRandomNumbers, std::ref(buffer),
|
|
std::ref(thread_numbers[thread_index]), n};
|
|
}
|
|
for (auto &thread : threads)
|
|
{
|
|
thread.join();
|
|
}
|
|
for (int thread_index = 0; thread_index < num_threads; ++thread_index)
|
|
{
|
|
numbers.insert(numbers.end(), thread_numbers[thread_index].begin(),
|
|
thread_numbers[thread_index].end());
|
|
}
|
|
}
|
|
|
|
void RunNumberConsumer(CircularBuffer<uint32_t> &buffer,
|
|
std::atomic<bool> &exit,
|
|
std::vector<uint32_t> &numbers)
|
|
{
|
|
while (true)
|
|
{
|
|
if (exit && buffer.Peek().empty())
|
|
{
|
|
return;
|
|
}
|
|
auto n = std::uniform_int_distribution<size_t>{0, buffer.Peek().size()}(RandomNumberGenerator);
|
|
buffer.Consume(n, [&](CircularBufferRange<AtomicUniquePtr<uint32_t>> range) noexcept {
|
|
assert(range.size() == n);
|
|
range.ForEach([&](AtomicUniquePtr<uint32_t> &ptr) noexcept {
|
|
assert(!ptr.IsNull());
|
|
numbers.push_back(*ptr);
|
|
ptr.Reset();
|
|
return true;
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
TEST(CircularBufferTest, Add)
|
|
{
|
|
CircularBuffer<int> buffer{10};
|
|
|
|
std::unique_ptr<int> x{new int{11}};
|
|
EXPECT_TRUE(buffer.Add(x));
|
|
EXPECT_EQ(x, nullptr);
|
|
auto range = buffer.Peek();
|
|
EXPECT_EQ(range.size(), 1);
|
|
range.ForEach([](const AtomicUniquePtr<int> &y) {
|
|
EXPECT_EQ(*y, 11);
|
|
return true;
|
|
});
|
|
}
|
|
|
|
TEST(CircularBufferTest, Clear)
|
|
{
|
|
CircularBuffer<int> buffer{10};
|
|
|
|
std::unique_ptr<int> x{new int{11}};
|
|
EXPECT_TRUE(buffer.Add(x));
|
|
EXPECT_EQ(x, nullptr);
|
|
buffer.Clear();
|
|
EXPECT_TRUE(buffer.empty());
|
|
}
|
|
|
|
TEST(CircularBufferTest, AddOnFull)
|
|
{
|
|
CircularBuffer<int> buffer{10};
|
|
for (int i = 0; i < static_cast<int>(buffer.max_size()); ++i)
|
|
{
|
|
std::unique_ptr<int> x{new int{i}};
|
|
EXPECT_TRUE(buffer.Add(x));
|
|
}
|
|
std::unique_ptr<int> x{new int{33}};
|
|
EXPECT_FALSE(buffer.Add(x));
|
|
EXPECT_NE(x, nullptr);
|
|
EXPECT_EQ(*x, 33);
|
|
}
|
|
|
|
TEST(CircularBufferTest, Consume)
|
|
{
|
|
CircularBuffer<int> buffer{10};
|
|
for (int i = 0; i < static_cast<int>(buffer.max_size()); ++i)
|
|
{
|
|
std::unique_ptr<int> x{new int{i}};
|
|
EXPECT_TRUE(buffer.Add(x));
|
|
}
|
|
int count = 0;
|
|
buffer.Consume(5, [&](CircularBufferRange<AtomicUniquePtr<int>> range) noexcept {
|
|
range.ForEach([&](AtomicUniquePtr<int> &ptr) {
|
|
EXPECT_EQ(*ptr, count++);
|
|
ptr.Reset();
|
|
return true;
|
|
});
|
|
});
|
|
EXPECT_EQ(count, 5);
|
|
}
|
|
|
|
TEST(CircularBufferTest, Simulation)
|
|
{
|
|
const int num_producer_threads = 4;
|
|
const int n = 25000;
|
|
for (size_t max_size : {1, 2, 10, 50, 100, 1000})
|
|
{
|
|
CircularBuffer<uint32_t> buffer{max_size};
|
|
std::vector<uint32_t> producer_numbers;
|
|
std::vector<uint32_t> consumer_numbers;
|
|
auto producers = std::thread{RunNumberProducers, std::ref(buffer), std::ref(producer_numbers),
|
|
num_producer_threads, n};
|
|
std::atomic<bool> exit{false};
|
|
auto consumer = std::thread{RunNumberConsumer, std::ref(buffer), std::ref(exit),
|
|
std::ref(consumer_numbers)};
|
|
producers.join();
|
|
exit = true;
|
|
consumer.join();
|
|
std::sort(producer_numbers.begin(), producer_numbers.end());
|
|
std::sort(consumer_numbers.begin(), consumer_numbers.end());
|
|
|
|
EXPECT_EQ(producer_numbers.size(), consumer_numbers.size());
|
|
EXPECT_EQ(producer_numbers, consumer_numbers);
|
|
}
|
|
}
|