// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 #include #include #include #include #include #include #include #include #include #include "opentelemetry/nostd/shared_ptr.h" #include "opentelemetry/nostd/span.h" #include "opentelemetry/nostd/string_view.h" #include "opentelemetry/sdk/common/attribute_utils.h" #include "opentelemetry/sdk/common/exporter_utils.h" #include "opentelemetry/sdk/common/global_log_handler.h" #include "opentelemetry/sdk/trace/batch_span_processor.h" #include "opentelemetry/sdk/trace/batch_span_processor_options.h" #include "opentelemetry/sdk/trace/exporter.h" #include "opentelemetry/sdk/trace/processor.h" #include "opentelemetry/sdk/trace/recordable.h" #include "opentelemetry/sdk/trace/span_data.h" #include "opentelemetry/version.h" OPENTELEMETRY_BEGIN_NAMESPACE /** * Returns a mock span exporter meant exclusively for testing only */ class MockSpanExporter final : public sdk::trace::SpanExporter { public: MockSpanExporter( std::shared_ptr>> spans_received, std::shared_ptr> shut_down_counter, std::shared_ptr> is_shutdown, std::shared_ptr> is_export_completed = std::shared_ptr>(new std::atomic(false)), const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0)) noexcept : spans_received_(std::move(spans_received)), shut_down_counter_(std::move(shut_down_counter)), is_shutdown_(std::move(is_shutdown)), is_export_completed_(std::move(is_export_completed)), export_delay_(export_delay) {} std::unique_ptr MakeRecordable() noexcept override { return std::unique_ptr(new sdk::trace::SpanData); } sdk::common::ExportResult Export( const nostd::span> &recordables) noexcept override { *is_export_completed_ = false; std::this_thread::sleep_for(export_delay_); for (auto &recordable : recordables) { auto span = std::unique_ptr( static_cast(recordable.release())); if (span != nullptr) { spans_received_->push_back(std::move(span)); } } *is_export_completed_ = true; return sdk::common::ExportResult::kSuccess; } bool ForceFlush(std::chrono::microseconds /*timeout*/) noexcept override { ++(*shut_down_counter_); return true; } bool Shutdown(std::chrono::microseconds /* timeout */) noexcept override { *is_shutdown_ = true; return true; } bool IsExportCompleted() { return is_export_completed_->load(); } private: std::shared_ptr>> spans_received_; std::shared_ptr> shut_down_counter_; std::shared_ptr> is_shutdown_; std::shared_ptr> is_export_completed_; // Meant exclusively to test force flush timeout const std::chrono::milliseconds export_delay_; }; /** * Fixture Class */ class BatchSpanProcessorTestPeer : public testing::Test { public: std::unique_ptr>> GetTestSpans( const std::shared_ptr &processor, const int num_spans) { std::unique_ptr>> test_spans( new std::vector>); for (int i = 0; i < num_spans; ++i) { test_spans->push_back(processor->MakeRecordable()); static_cast(test_spans->at(i).get()) ->SetName("Span " + std::to_string(i)); } return test_spans; } }; /* ################################## TESTS ############################################ */ TEST_F(BatchSpanProcessorTestPeer, TestShutdown) { std::shared_ptr> shut_down_counter(new std::atomic(0)); std::shared_ptr> is_shutdown(new std::atomic(false)); std::shared_ptr>> spans_received( new std::vector>); auto batch_processor = std::shared_ptr(new sdk::trace::BatchSpanProcessor( std::unique_ptr( new MockSpanExporter(spans_received, shut_down_counter, is_shutdown)), sdk::trace::BatchSpanProcessorOptions())); const int num_spans = 3; auto test_spans = GetTestSpans(batch_processor, num_spans); for (int i = 0; i < num_spans; ++i) { batch_processor->OnEnd(std::move(test_spans->at(i))); } EXPECT_TRUE(batch_processor->Shutdown()); // It's safe to shutdown again EXPECT_TRUE(batch_processor->Shutdown()); EXPECT_EQ(num_spans, spans_received->size()); for (int i = 0; i < num_spans; ++i) { EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName()); } EXPECT_TRUE(is_shutdown->load()); } TEST_F(BatchSpanProcessorTestPeer, TestForceFlush) { std::shared_ptr> shut_down_counter(new std::atomic(0)); std::shared_ptr> is_shutdown(new std::atomic(false)); std::shared_ptr>> spans_received( new std::vector>); auto batch_processor = std::shared_ptr(new sdk::trace::BatchSpanProcessor( std::unique_ptr( new MockSpanExporter(spans_received, shut_down_counter, is_shutdown)), sdk::trace::BatchSpanProcessorOptions())); const int num_spans = 2048; auto test_spans = GetTestSpans(batch_processor, num_spans); for (int i = 0; i < num_spans; ++i) { batch_processor->OnEnd(std::move(test_spans->at(i))); } // Give some time to export std::this_thread::sleep_for(std::chrono::milliseconds(50)); EXPECT_TRUE(batch_processor->ForceFlush()); EXPECT_GE(shut_down_counter->load(), 1); EXPECT_EQ(num_spans, spans_received->size()); for (int i = 0; i < num_spans; ++i) { EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName()); } // Create some more spans to make sure that the processor still works auto more_test_spans = GetTestSpans(batch_processor, num_spans); for (int i = 0; i < num_spans; ++i) { batch_processor->OnEnd(std::move(more_test_spans->at(i))); } // Give some time to export the spans std::this_thread::sleep_for(std::chrono::milliseconds(50)); auto shut_down_counter_before = shut_down_counter->load(); EXPECT_TRUE(batch_processor->ForceFlush()); EXPECT_GT(shut_down_counter->load(), shut_down_counter_before); EXPECT_EQ(num_spans * 2, spans_received->size()); for (int i = 0; i < num_spans; ++i) { EXPECT_EQ("Span " + std::to_string(i % num_spans), spans_received->at(num_spans + i)->GetName()); } } // A mock log handler to check whether log messages with a specific level were emitted. struct MockLogHandler : public sdk::common::internal_log::LogHandler { using Message = std::pair; void Handle(sdk::common::internal_log::LogLevel level, const char * /*file*/, int /*line*/, const char *msg, const sdk::common::AttributeMap & /*attributes*/) noexcept override { messages.emplace_back(level, msg); } std::vector messages; }; TEST_F(BatchSpanProcessorTestPeer, TestManySpansLoss) { /* Test that when exporting more than max_queue_size spans, some are most likely lost*/ // Set up a log handler to verify a warning is generated. auto log_handler = nostd::shared_ptr(new MockLogHandler()); sdk::common::internal_log::GlobalLogHandler::SetLogHandler(log_handler); std::shared_ptr> shut_down_counter(new std::atomic(0)); std::shared_ptr> is_shutdown(new std::atomic(false)); std::shared_ptr>> spans_received( new std::vector>); const int max_queue_size = 4096; auto batch_processor = std::shared_ptr(new sdk::trace::BatchSpanProcessor( std::unique_ptr( new MockSpanExporter(spans_received, shut_down_counter, is_shutdown)), sdk::trace::BatchSpanProcessorOptions())); auto test_spans = GetTestSpans(batch_processor, max_queue_size); for (int i = 0; i < max_queue_size; ++i) { batch_processor->OnEnd(std::move(test_spans->at(i))); } // Give some time to export the spans std::this_thread::sleep_for(std::chrono::milliseconds(700)); EXPECT_TRUE(batch_processor->ForceFlush()); // Span should be exported by now EXPECT_GE(max_queue_size, spans_received->size()); // If we haven't received all spans, some must have dropped, verify a warning was logged. // Only do this when the log level is warning or above. #if OTEL_INTERNAL_LOG_LEVEL >= OTEL_INTERNAL_LOG_LEVEL_WARN if (max_queue_size > spans_received->size()) { auto &messages = static_cast(log_handler.get())->messages; EXPECT_TRUE( std::find(messages.begin(), messages.end(), MockLogHandler::Message(sdk::common::internal_log::LogLevel::Warning, "BatchSpanProcessor queue is full - dropping span.")) != messages.end()); } #endif // Reinstate the default log handler. sdk::common::internal_log::GlobalLogHandler::SetLogHandler( nostd::shared_ptr( new sdk::common::internal_log::DefaultLogHandler())); } TEST_F(BatchSpanProcessorTestPeer, TestManySpansLossLess) { /* Test that no spans are lost when sending max_queue_size spans */ std::shared_ptr> shut_down_counter(new std::atomic(0)); std::shared_ptr> is_shutdown(new std::atomic(false)); std::shared_ptr>> spans_received( new std::vector>); const int num_spans = 2048; auto batch_processor = std::shared_ptr(new sdk::trace::BatchSpanProcessor( std::unique_ptr( new MockSpanExporter(spans_received, shut_down_counter, is_shutdown)), sdk::trace::BatchSpanProcessorOptions())); auto test_spans = GetTestSpans(batch_processor, num_spans); for (int i = 0; i < num_spans; ++i) { batch_processor->OnEnd(std::move(test_spans->at(i))); } // Give some time to export the spans std::this_thread::sleep_for(std::chrono::milliseconds(50)); EXPECT_TRUE(batch_processor->ForceFlush()); EXPECT_EQ(num_spans, spans_received->size()); for (int i = 0; i < num_spans; ++i) { EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName()); } } TEST_F(BatchSpanProcessorTestPeer, TestScheduleDelayMillis) { /* Test that max_export_batch_size spans are exported every schedule_delay_millis seconds */ std::shared_ptr> shut_down_counter(new std::atomic(0)); std::shared_ptr> is_shutdown(new std::atomic(false)); std::shared_ptr> is_export_completed(new std::atomic(false)); std::shared_ptr>> spans_received( new std::vector>); const std::chrono::milliseconds export_delay(0); const size_t max_export_batch_size = 512; sdk::trace::BatchSpanProcessorOptions options{}; options.schedule_delay_millis = std::chrono::milliseconds(2000); auto batch_processor = std::shared_ptr(new sdk::trace::BatchSpanProcessor( std::unique_ptr(new MockSpanExporter( spans_received, shut_down_counter, is_shutdown, is_export_completed, export_delay)), options)); auto test_spans = GetTestSpans(batch_processor, max_export_batch_size); for (size_t i = 0; i < max_export_batch_size; ++i) { batch_processor->OnEnd(std::move(test_spans->at(i))); } // Sleep for schedule_delay_millis milliseconds std::this_thread::sleep_for(options.schedule_delay_millis); // small delay to give time to export std::this_thread::sleep_for(std::chrono::milliseconds(50)); // Spans should be exported by now EXPECT_TRUE(is_export_completed->load()); EXPECT_EQ(max_export_batch_size, spans_received->size()); for (size_t i = 0; i < max_export_batch_size; ++i) { EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName()); } } OPENTELEMETRY_END_NAMESPACE