368 lines
13 KiB
C++
368 lines
13 KiB
C++
// Copyright The OpenTelemetry Authors
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
#include <gtest/gtest.h>
|
|
#include <algorithm>
|
|
#include <atomic>
|
|
#include <chrono>
|
|
#include <cstddef>
|
|
#include <string>
|
|
#include <thread>
|
|
#include <utility>
|
|
#include <vector>
|
|
|
|
#include "opentelemetry/nostd/shared_ptr.h"
|
|
#include "opentelemetry/nostd/span.h"
|
|
#include "opentelemetry/nostd/string_view.h"
|
|
#include "opentelemetry/sdk/common/attribute_utils.h"
|
|
#include "opentelemetry/sdk/common/exporter_utils.h"
|
|
#include "opentelemetry/sdk/common/global_log_handler.h"
|
|
#include "opentelemetry/sdk/trace/batch_span_processor.h"
|
|
#include "opentelemetry/sdk/trace/batch_span_processor_options.h"
|
|
#include "opentelemetry/sdk/trace/exporter.h"
|
|
#include "opentelemetry/sdk/trace/processor.h"
|
|
#include "opentelemetry/sdk/trace/recordable.h"
|
|
#include "opentelemetry/sdk/trace/span_data.h"
|
|
#include "opentelemetry/version.h"
|
|
|
|
OPENTELEMETRY_BEGIN_NAMESPACE
|
|
|
|
/**
|
|
* Returns a mock span exporter meant exclusively for testing only
|
|
*/
|
|
class MockSpanExporter final : public sdk::trace::SpanExporter
|
|
{
|
|
public:
|
|
MockSpanExporter(
|
|
std::shared_ptr<std::vector<std::unique_ptr<sdk::trace::SpanData>>> spans_received,
|
|
std::shared_ptr<std::atomic<std::size_t>> shut_down_counter,
|
|
std::shared_ptr<std::atomic<bool>> is_shutdown,
|
|
std::shared_ptr<std::atomic<bool>> is_export_completed =
|
|
std::shared_ptr<std::atomic<bool>>(new std::atomic<bool>(false)),
|
|
const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0)) noexcept
|
|
: spans_received_(std::move(spans_received)),
|
|
shut_down_counter_(std::move(shut_down_counter)),
|
|
is_shutdown_(std::move(is_shutdown)),
|
|
is_export_completed_(std::move(is_export_completed)),
|
|
export_delay_(export_delay)
|
|
{}
|
|
|
|
std::unique_ptr<sdk::trace::Recordable> MakeRecordable() noexcept override
|
|
{
|
|
return std::unique_ptr<sdk::trace::Recordable>(new sdk::trace::SpanData);
|
|
}
|
|
|
|
sdk::common::ExportResult Export(
|
|
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &recordables) noexcept override
|
|
{
|
|
*is_export_completed_ = false;
|
|
|
|
std::this_thread::sleep_for(export_delay_);
|
|
|
|
for (auto &recordable : recordables)
|
|
{
|
|
auto span = std::unique_ptr<sdk::trace::SpanData>(
|
|
static_cast<sdk::trace::SpanData *>(recordable.release()));
|
|
|
|
if (span != nullptr)
|
|
{
|
|
spans_received_->push_back(std::move(span));
|
|
}
|
|
}
|
|
|
|
*is_export_completed_ = true;
|
|
return sdk::common::ExportResult::kSuccess;
|
|
}
|
|
|
|
bool ForceFlush(std::chrono::microseconds /*timeout*/) noexcept override
|
|
{
|
|
++(*shut_down_counter_);
|
|
return true;
|
|
}
|
|
|
|
bool Shutdown(std::chrono::microseconds /* timeout */) noexcept override
|
|
{
|
|
*is_shutdown_ = true;
|
|
return true;
|
|
}
|
|
|
|
bool IsExportCompleted() { return is_export_completed_->load(); }
|
|
|
|
private:
|
|
std::shared_ptr<std::vector<std::unique_ptr<sdk::trace::SpanData>>> spans_received_;
|
|
std::shared_ptr<std::atomic<std::size_t>> shut_down_counter_;
|
|
std::shared_ptr<std::atomic<bool>> is_shutdown_;
|
|
std::shared_ptr<std::atomic<bool>> is_export_completed_;
|
|
// Meant exclusively to test force flush timeout
|
|
const std::chrono::milliseconds export_delay_;
|
|
};
|
|
|
|
/**
|
|
* Fixture Class
|
|
*/
|
|
class BatchSpanProcessorTestPeer : public testing::Test
|
|
{
|
|
public:
|
|
std::unique_ptr<std::vector<std::unique_ptr<sdk::trace::Recordable>>> GetTestSpans(
|
|
const std::shared_ptr<sdk::trace::SpanProcessor> &processor,
|
|
const int num_spans)
|
|
{
|
|
std::unique_ptr<std::vector<std::unique_ptr<sdk::trace::Recordable>>> test_spans(
|
|
new std::vector<std::unique_ptr<sdk::trace::Recordable>>);
|
|
|
|
for (int i = 0; i < num_spans; ++i)
|
|
{
|
|
test_spans->push_back(processor->MakeRecordable());
|
|
static_cast<sdk::trace::SpanData *>(test_spans->at(i).get())
|
|
->SetName("Span " + std::to_string(i));
|
|
}
|
|
|
|
return test_spans;
|
|
}
|
|
};
|
|
|
|
/* ################################## TESTS ############################################ */
|
|
|
|
TEST_F(BatchSpanProcessorTestPeer, TestShutdown)
|
|
{
|
|
std::shared_ptr<std::atomic<std::size_t>> shut_down_counter(new std::atomic<std::size_t>(0));
|
|
std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false));
|
|
std::shared_ptr<std::vector<std::unique_ptr<sdk::trace::SpanData>>> spans_received(
|
|
new std::vector<std::unique_ptr<sdk::trace::SpanData>>);
|
|
|
|
auto batch_processor =
|
|
std::shared_ptr<sdk::trace::BatchSpanProcessor>(new sdk::trace::BatchSpanProcessor(
|
|
std::unique_ptr<sdk::trace::SpanExporter>(
|
|
new MockSpanExporter(spans_received, shut_down_counter, is_shutdown)),
|
|
sdk::trace::BatchSpanProcessorOptions()));
|
|
const int num_spans = 3;
|
|
|
|
auto test_spans = GetTestSpans(batch_processor, num_spans);
|
|
|
|
for (int i = 0; i < num_spans; ++i)
|
|
{
|
|
batch_processor->OnEnd(std::move(test_spans->at(i)));
|
|
}
|
|
|
|
EXPECT_TRUE(batch_processor->Shutdown());
|
|
// It's safe to shutdown again
|
|
EXPECT_TRUE(batch_processor->Shutdown());
|
|
|
|
EXPECT_EQ(num_spans, spans_received->size());
|
|
for (int i = 0; i < num_spans; ++i)
|
|
{
|
|
EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName());
|
|
}
|
|
|
|
EXPECT_TRUE(is_shutdown->load());
|
|
}
|
|
|
|
TEST_F(BatchSpanProcessorTestPeer, TestForceFlush)
|
|
{
|
|
std::shared_ptr<std::atomic<std::size_t>> shut_down_counter(new std::atomic<std::size_t>(0));
|
|
std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false));
|
|
std::shared_ptr<std::vector<std::unique_ptr<sdk::trace::SpanData>>> spans_received(
|
|
new std::vector<std::unique_ptr<sdk::trace::SpanData>>);
|
|
|
|
auto batch_processor =
|
|
std::shared_ptr<sdk::trace::BatchSpanProcessor>(new sdk::trace::BatchSpanProcessor(
|
|
std::unique_ptr<sdk::trace::SpanExporter>(
|
|
new MockSpanExporter(spans_received, shut_down_counter, is_shutdown)),
|
|
sdk::trace::BatchSpanProcessorOptions()));
|
|
const int num_spans = 2048;
|
|
|
|
auto test_spans = GetTestSpans(batch_processor, num_spans);
|
|
|
|
for (int i = 0; i < num_spans; ++i)
|
|
{
|
|
batch_processor->OnEnd(std::move(test_spans->at(i)));
|
|
}
|
|
|
|
// Give some time to export
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
|
|
|
EXPECT_TRUE(batch_processor->ForceFlush());
|
|
EXPECT_GE(shut_down_counter->load(), 1);
|
|
|
|
EXPECT_EQ(num_spans, spans_received->size());
|
|
for (int i = 0; i < num_spans; ++i)
|
|
{
|
|
EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName());
|
|
}
|
|
|
|
// Create some more spans to make sure that the processor still works
|
|
auto more_test_spans = GetTestSpans(batch_processor, num_spans);
|
|
for (int i = 0; i < num_spans; ++i)
|
|
{
|
|
batch_processor->OnEnd(std::move(more_test_spans->at(i)));
|
|
}
|
|
|
|
// Give some time to export the spans
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
|
|
|
auto shut_down_counter_before = shut_down_counter->load();
|
|
EXPECT_TRUE(batch_processor->ForceFlush());
|
|
EXPECT_GT(shut_down_counter->load(), shut_down_counter_before);
|
|
|
|
EXPECT_EQ(num_spans * 2, spans_received->size());
|
|
for (int i = 0; i < num_spans; ++i)
|
|
{
|
|
EXPECT_EQ("Span " + std::to_string(i % num_spans),
|
|
spans_received->at(num_spans + i)->GetName());
|
|
}
|
|
}
|
|
|
|
// A mock log handler to check whether log messages with a specific level were emitted.
|
|
struct MockLogHandler : public sdk::common::internal_log::LogHandler
|
|
{
|
|
using Message = std::pair<sdk::common::internal_log::LogLevel, std::string>;
|
|
|
|
void Handle(sdk::common::internal_log::LogLevel level,
|
|
const char * /*file*/,
|
|
int /*line*/,
|
|
const char *msg,
|
|
const sdk::common::AttributeMap & /*attributes*/) noexcept override
|
|
{
|
|
messages.emplace_back(level, msg);
|
|
}
|
|
|
|
std::vector<Message> messages;
|
|
};
|
|
|
|
TEST_F(BatchSpanProcessorTestPeer, TestManySpansLoss)
|
|
{
|
|
/* Test that when exporting more than max_queue_size spans, some are most likely lost*/
|
|
|
|
// Set up a log handler to verify a warning is generated.
|
|
auto log_handler = nostd::shared_ptr<sdk::common::internal_log::LogHandler>(new MockLogHandler());
|
|
sdk::common::internal_log::GlobalLogHandler::SetLogHandler(log_handler);
|
|
|
|
std::shared_ptr<std::atomic<std::size_t>> shut_down_counter(new std::atomic<std::size_t>(0));
|
|
std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false));
|
|
std::shared_ptr<std::vector<std::unique_ptr<sdk::trace::SpanData>>> spans_received(
|
|
new std::vector<std::unique_ptr<sdk::trace::SpanData>>);
|
|
|
|
const int max_queue_size = 4096;
|
|
|
|
auto batch_processor =
|
|
std::shared_ptr<sdk::trace::BatchSpanProcessor>(new sdk::trace::BatchSpanProcessor(
|
|
std::unique_ptr<sdk::trace::SpanExporter>(
|
|
new MockSpanExporter(spans_received, shut_down_counter, is_shutdown)),
|
|
sdk::trace::BatchSpanProcessorOptions()));
|
|
|
|
auto test_spans = GetTestSpans(batch_processor, max_queue_size);
|
|
|
|
for (int i = 0; i < max_queue_size; ++i)
|
|
{
|
|
batch_processor->OnEnd(std::move(test_spans->at(i)));
|
|
}
|
|
|
|
// Give some time to export the spans
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(700));
|
|
|
|
EXPECT_TRUE(batch_processor->ForceFlush());
|
|
|
|
// Span should be exported by now
|
|
EXPECT_GE(max_queue_size, spans_received->size());
|
|
|
|
// If we haven't received all spans, some must have dropped, verify a warning was logged.
|
|
// Only do this when the log level is warning or above.
|
|
#if OTEL_INTERNAL_LOG_LEVEL >= OTEL_INTERNAL_LOG_LEVEL_WARN
|
|
if (max_queue_size > spans_received->size())
|
|
{
|
|
auto &messages = static_cast<MockLogHandler *>(log_handler.get())->messages;
|
|
EXPECT_TRUE(
|
|
std::find(messages.begin(), messages.end(),
|
|
MockLogHandler::Message(sdk::common::internal_log::LogLevel::Warning,
|
|
"BatchSpanProcessor queue is full - dropping span.")) !=
|
|
messages.end());
|
|
}
|
|
#endif
|
|
|
|
// Reinstate the default log handler.
|
|
sdk::common::internal_log::GlobalLogHandler::SetLogHandler(
|
|
nostd::shared_ptr<sdk::common::internal_log::LogHandler>(
|
|
new sdk::common::internal_log::DefaultLogHandler()));
|
|
}
|
|
|
|
TEST_F(BatchSpanProcessorTestPeer, TestManySpansLossLess)
|
|
{
|
|
/* Test that no spans are lost when sending max_queue_size spans */
|
|
|
|
std::shared_ptr<std::atomic<std::size_t>> shut_down_counter(new std::atomic<std::size_t>(0));
|
|
std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false));
|
|
std::shared_ptr<std::vector<std::unique_ptr<sdk::trace::SpanData>>> spans_received(
|
|
new std::vector<std::unique_ptr<sdk::trace::SpanData>>);
|
|
|
|
const int num_spans = 2048;
|
|
|
|
auto batch_processor =
|
|
std::shared_ptr<sdk::trace::BatchSpanProcessor>(new sdk::trace::BatchSpanProcessor(
|
|
std::unique_ptr<sdk::trace::SpanExporter>(
|
|
new MockSpanExporter(spans_received, shut_down_counter, is_shutdown)),
|
|
sdk::trace::BatchSpanProcessorOptions()));
|
|
|
|
auto test_spans = GetTestSpans(batch_processor, num_spans);
|
|
|
|
for (int i = 0; i < num_spans; ++i)
|
|
{
|
|
batch_processor->OnEnd(std::move(test_spans->at(i)));
|
|
}
|
|
|
|
// Give some time to export the spans
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
|
|
|
EXPECT_TRUE(batch_processor->ForceFlush());
|
|
|
|
EXPECT_EQ(num_spans, spans_received->size());
|
|
for (int i = 0; i < num_spans; ++i)
|
|
{
|
|
EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName());
|
|
}
|
|
}
|
|
|
|
TEST_F(BatchSpanProcessorTestPeer, TestScheduleDelayMillis)
|
|
{
|
|
/* Test that max_export_batch_size spans are exported every schedule_delay_millis
|
|
seconds */
|
|
|
|
std::shared_ptr<std::atomic<std::size_t>> shut_down_counter(new std::atomic<std::size_t>(0));
|
|
std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false));
|
|
std::shared_ptr<std::atomic<bool>> is_export_completed(new std::atomic<bool>(false));
|
|
std::shared_ptr<std::vector<std::unique_ptr<sdk::trace::SpanData>>> spans_received(
|
|
new std::vector<std::unique_ptr<sdk::trace::SpanData>>);
|
|
const std::chrono::milliseconds export_delay(0);
|
|
const size_t max_export_batch_size = 512;
|
|
sdk::trace::BatchSpanProcessorOptions options{};
|
|
options.schedule_delay_millis = std::chrono::milliseconds(2000);
|
|
|
|
auto batch_processor =
|
|
std::shared_ptr<sdk::trace::BatchSpanProcessor>(new sdk::trace::BatchSpanProcessor(
|
|
std::unique_ptr<sdk::trace::SpanExporter>(new MockSpanExporter(
|
|
spans_received, shut_down_counter, is_shutdown, is_export_completed, export_delay)),
|
|
options));
|
|
|
|
auto test_spans = GetTestSpans(batch_processor, max_export_batch_size);
|
|
|
|
for (size_t i = 0; i < max_export_batch_size; ++i)
|
|
{
|
|
batch_processor->OnEnd(std::move(test_spans->at(i)));
|
|
}
|
|
|
|
// Sleep for schedule_delay_millis milliseconds
|
|
std::this_thread::sleep_for(options.schedule_delay_millis);
|
|
|
|
// small delay to give time to export
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
|
|
|
// Spans should be exported by now
|
|
EXPECT_TRUE(is_export_completed->load());
|
|
EXPECT_EQ(max_export_batch_size, spans_received->size());
|
|
for (size_t i = 0; i < max_export_batch_size; ++i)
|
|
{
|
|
EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName());
|
|
}
|
|
}
|
|
|
|
OPENTELEMETRY_END_NAMESPACE
|