Prevent concurrent `::export()` calls in span processor (#788)
* Prevent concurrent `::export()` calls in batch span processor * Prevent concurrent `::export()` calls in simple span processor * Allow disabling auto-flush in batch processor `::onEnd()` * Include in-flight batches in queue limit * Handle exporter exceptions to prevent termination of worker * Use `LogsMessagesTrait`
This commit is contained in:
parent
74eda3d89d
commit
2457c76015
|
@ -4,142 +4,213 @@ declare(strict_types=1);
|
|||
|
||||
namespace OpenTelemetry\SDK\Trace\SpanProcessor;
|
||||
|
||||
use function assert;
|
||||
use function count;
|
||||
use InvalidArgumentException;
|
||||
use OpenTelemetry\Context\Context;
|
||||
use OpenTelemetry\SDK\Common\Environment\EnvironmentVariablesTrait;
|
||||
use OpenTelemetry\SDK\Common\Environment\Variables as Env;
|
||||
use OpenTelemetry\SDK\Behavior\LogsMessagesTrait;
|
||||
use OpenTelemetry\SDK\Common\Future\CancellationInterface;
|
||||
use OpenTelemetry\SDK\Common\Time\ClockFactory;
|
||||
use OpenTelemetry\SDK\Common\Time\ClockInterface;
|
||||
use OpenTelemetry\SDK\Common\Time\StopWatch;
|
||||
use OpenTelemetry\SDK\Common\Time\StopWatchFactory;
|
||||
use OpenTelemetry\SDK\Common\Time\Util as TimeUtil;
|
||||
use OpenTelemetry\SDK\Trace\ReadableSpanInterface;
|
||||
use OpenTelemetry\SDK\Trace\ReadWriteSpanInterface;
|
||||
use OpenTelemetry\SDK\Trace\SpanDataInterface;
|
||||
use OpenTelemetry\SDK\Trace\SpanExporterInterface;
|
||||
use OpenTelemetry\SDK\Trace\SpanProcessorInterface;
|
||||
use SplQueue;
|
||||
use function sprintf;
|
||||
use Throwable;
|
||||
|
||||
class BatchSpanProcessor implements SpanProcessorInterface
|
||||
{
|
||||
use EnvironmentVariablesTrait;
|
||||
use LogsMessagesTrait;
|
||||
|
||||
public const DEFAULT_SCHEDULE_DELAY = 5000;
|
||||
public const DEFAULT_EXPORT_TIMEOUT = 30000;
|
||||
public const DEFAULT_MAX_QUEUE_SIZE = 2048;
|
||||
public const DEFAULT_MAX_EXPORT_BATCH_SIZE = 512;
|
||||
|
||||
private ?SpanExporterInterface $exporter;
|
||||
private ?int $maxQueueSize;
|
||||
private ?int $scheduledDelayMillis;
|
||||
// @todo: Please, check if this code is needed. It creates an error in phpstan, since it's not used
|
||||
/** @phpstan-ignore-next-line */
|
||||
private ?int $exporterTimeoutMillis;
|
||||
private ?int $maxExportBatchSize;
|
||||
private bool $running = true;
|
||||
private StopWatch $stopwatch;
|
||||
private SpanExporterInterface $exporter;
|
||||
private ClockInterface $clock;
|
||||
private int $maxQueueSize;
|
||||
private int $scheduledDelayNanos;
|
||||
private int $maxExportBatchSize;
|
||||
private bool $autoFlush;
|
||||
|
||||
private ?int $nextScheduledRun = null;
|
||||
private bool $running = false;
|
||||
private int $batchId = 0;
|
||||
private int $queueSize = 0;
|
||||
/** @var list<SpanDataInterface> */
|
||||
private array $queue = [];
|
||||
private array $batch = [];
|
||||
/** @var SplQueue<list<SpanDataInterface>> */
|
||||
private SplQueue $queue;
|
||||
/** @var SplQueue<array{int, string, ?CancellationInterface, bool}> */
|
||||
private SplQueue $flush;
|
||||
|
||||
private bool $closed = false;
|
||||
|
||||
public function __construct(
|
||||
?SpanExporterInterface $exporter,
|
||||
ClockInterface $clock = null,
|
||||
int $maxQueueSize = null,
|
||||
int $scheduledDelayMillis = null,
|
||||
int $exporterTimeoutMillis = null,
|
||||
int $maxExportBatchSize = null
|
||||
SpanExporterInterface $exporter,
|
||||
ClockInterface $clock,
|
||||
int $maxQueueSize = self::DEFAULT_MAX_QUEUE_SIZE,
|
||||
int $scheduledDelayMillis = self::DEFAULT_SCHEDULE_DELAY,
|
||||
int $exportTimeoutMillis = self::DEFAULT_EXPORT_TIMEOUT,
|
||||
int $maxExportBatchSize = self::DEFAULT_MAX_EXPORT_BATCH_SIZE,
|
||||
bool $autoFlush = true
|
||||
) {
|
||||
$this->exporter = $exporter;
|
||||
// @todo make the stopwatch a dependency rather than using the factory?
|
||||
$this->stopwatch = StopWatchFactory::create($clock ?? ClockFactory::getDefault())->build();
|
||||
$this->stopwatch->start();
|
||||
$this->maxQueueSize = $maxQueueSize
|
||||
?: $this->getIntFromEnvironment(Env::OTEL_BSP_MAX_QUEUE_SIZE, self::DEFAULT_MAX_QUEUE_SIZE);
|
||||
$this->scheduledDelayMillis = $scheduledDelayMillis
|
||||
?: $this->getIntFromEnvironment(Env::OTEL_BSP_SCHEDULE_DELAY, self::DEFAULT_SCHEDULE_DELAY);
|
||||
$this->exporterTimeoutMillis = $exporterTimeoutMillis
|
||||
?: $this->getIntFromEnvironment(Env::OTEL_BSP_EXPORT_TIMEOUT, self::DEFAULT_EXPORT_TIMEOUT);
|
||||
$this->maxExportBatchSize = $maxExportBatchSize
|
||||
?: $this->getIntFromEnvironment(Env::OTEL_BSP_MAX_EXPORT_BATCH_SIZE, self::DEFAULT_MAX_EXPORT_BATCH_SIZE);
|
||||
if ($this->maxExportBatchSize > $this->maxQueueSize) {
|
||||
throw new InvalidArgumentException(
|
||||
sprintf('maxExportBatchSize should be smaller or equal to %s', $this->maxQueueSize)
|
||||
);
|
||||
if ($maxQueueSize <= 0) {
|
||||
throw new InvalidArgumentException(sprintf('Maximum queue size (%d) must be greater than zero', $maxQueueSize));
|
||||
}
|
||||
if ($scheduledDelayMillis <= 0) {
|
||||
throw new InvalidArgumentException(sprintf('Scheduled delay (%d) must be greater than zero', $scheduledDelayMillis));
|
||||
}
|
||||
if ($exportTimeoutMillis <= 0) {
|
||||
throw new InvalidArgumentException(sprintf('Export timeout (%d) must be greater than zero', $exportTimeoutMillis));
|
||||
}
|
||||
if ($maxExportBatchSize <= 0) {
|
||||
throw new InvalidArgumentException(sprintf('Maximum export batch size (%d) must be greater than zero', $maxExportBatchSize));
|
||||
}
|
||||
if ($maxExportBatchSize > $maxQueueSize) {
|
||||
throw new InvalidArgumentException(sprintf('Maximum export batch size (%d) must be less than or equal to maximum queue size (%d)', $maxExportBatchSize, $maxQueueSize));
|
||||
}
|
||||
|
||||
$this->exporter = $exporter;
|
||||
$this->clock = $clock;
|
||||
$this->maxQueueSize = $maxQueueSize;
|
||||
$this->scheduledDelayNanos = $scheduledDelayMillis * 1_000_000;
|
||||
$this->maxExportBatchSize = $maxExportBatchSize;
|
||||
$this->autoFlush = $autoFlush;
|
||||
|
||||
$this->queue = new SplQueue();
|
||||
$this->flush = new SplQueue();
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function onStart(ReadWriteSpanInterface $span, Context $parentContext): void
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function onEnd(ReadableSpanInterface $span): void
|
||||
{
|
||||
if (null === $this->exporter) {
|
||||
if ($this->closed) {
|
||||
return;
|
||||
}
|
||||
if (!$span->getContext()->isSampled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!$this->running) {
|
||||
if ($this->queueSize === $this->maxQueueSize) {
|
||||
return;
|
||||
}
|
||||
|
||||
if ($span->getContext()->isSampled() && !$this->queueReachedLimit()) {
|
||||
$this->queue[] = $span->toSpanData();
|
||||
}
|
||||
$this->queueSize++;
|
||||
$this->batch[] = $span->toSpanData();
|
||||
$this->nextScheduledRun ??= $this->clock->now() + $this->scheduledDelayNanos;
|
||||
|
||||
if ($this->bufferReachedExportLimit() || $this->enoughTimeHasPassed()) {
|
||||
$this->forceFlush();
|
||||
if (count($this->batch) === $this->maxExportBatchSize) {
|
||||
$this->enqueueBatch();
|
||||
}
|
||||
if ($this->autoFlush) {
|
||||
$this->flush();
|
||||
}
|
||||
}
|
||||
|
||||
/** @inheritDoc */
|
||||
public function forceFlush(?CancellationInterface $cancellation = null): bool
|
||||
{
|
||||
if (!$this->running || $this->exporter === null) {
|
||||
return true;
|
||||
if ($this->closed) {
|
||||
return false;
|
||||
}
|
||||
|
||||
$this->exporter->export($this->queue)->await();
|
||||
$this->queue = [];
|
||||
$this->stopwatch->reset();
|
||||
$this->exporter->forceFlush();
|
||||
|
||||
return true;
|
||||
return $this->flush(__FUNCTION__, $cancellation);
|
||||
}
|
||||
|
||||
/** @inheritDoc */
|
||||
public function shutdown(?CancellationInterface $cancellation = null): bool
|
||||
{
|
||||
if (!$this->running) {
|
||||
return true;
|
||||
if ($this->closed) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (null !== $this->exporter && $this->forceFlush()) {
|
||||
$this->exporter->shutdown();
|
||||
$this->closed = true;
|
||||
|
||||
return $this->flush(__FUNCTION__, $cancellation);
|
||||
}
|
||||
|
||||
private function flush(?string $flushMethod = null, ?CancellationInterface $cancellation = null): bool
|
||||
{
|
||||
if ($flushMethod !== null) {
|
||||
$flushId = $this->batchId + $this->queue->count() + (int) (bool) $this->batch;
|
||||
$this->flush->enqueue([$flushId, $flushMethod, $cancellation, !$this->running]);
|
||||
}
|
||||
$this->running = false;
|
||||
|
||||
return true;
|
||||
if ($this->running) {
|
||||
return false;
|
||||
}
|
||||
|
||||
$success = true;
|
||||
$exception = null;
|
||||
$this->running = true;
|
||||
|
||||
try {
|
||||
for (;;) {
|
||||
while (!$this->flush->isEmpty() && $this->flush->bottom()[0] <= $this->batchId) {
|
||||
[, $flushMethod, $cancellation, $propagateResult] = $this->flush->dequeue();
|
||||
|
||||
try {
|
||||
$result = $this->exporter->$flushMethod($cancellation);
|
||||
if ($propagateResult) {
|
||||
$success = $result;
|
||||
}
|
||||
} catch (Throwable $e) {
|
||||
if ($propagateResult) {
|
||||
$exception = $e;
|
||||
|
||||
continue;
|
||||
}
|
||||
self::logError(sprintf('Unhandled %s error', $flushMethod), ['exception' => $e]);
|
||||
}
|
||||
}
|
||||
|
||||
if (!$this->shouldFlush()) {
|
||||
break;
|
||||
}
|
||||
|
||||
if ($this->queue->isEmpty()) {
|
||||
$this->enqueueBatch();
|
||||
}
|
||||
$batchSize = count($this->queue->bottom());
|
||||
$this->batchId++;
|
||||
|
||||
try {
|
||||
$this->exporter->export($this->queue->dequeue())->await();
|
||||
} catch (Throwable $e) {
|
||||
self::logError('Unhandled export error', ['exception' => $e]);
|
||||
} finally {
|
||||
$this->queueSize -= $batchSize;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
$this->running = false;
|
||||
}
|
||||
|
||||
if ($exception !== null) {
|
||||
throw $exception;
|
||||
}
|
||||
|
||||
return $success;
|
||||
}
|
||||
|
||||
protected function bufferReachedExportLimit(): bool
|
||||
private function shouldFlush(): bool
|
||||
{
|
||||
return count($this->queue) >= $this->maxExportBatchSize;
|
||||
return !$this->flush->isEmpty()
|
||||
|| $this->autoFlush && !$this->queue->isEmpty()
|
||||
|| $this->autoFlush && $this->nextScheduledRun !== null && $this->clock->now() > $this->nextScheduledRun;
|
||||
}
|
||||
|
||||
protected function queueReachedLimit(): bool
|
||||
private function enqueueBatch(): void
|
||||
{
|
||||
return count($this->queue) >= $this->maxQueueSize;
|
||||
}
|
||||
assert($this->batch !== []);
|
||||
|
||||
protected function enoughTimeHasPassed(): bool
|
||||
{
|
||||
return TimeUtil::millisToNanos((int) $this->scheduledDelayMillis) < $this->stopwatch->getLastElapsedTime();
|
||||
$this->queue->enqueue($this->batch);
|
||||
$this->batch = [];
|
||||
$this->nextScheduledRun = null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ declare(strict_types=1);
|
|||
|
||||
namespace OpenTelemetry\SDK\Trace\SpanProcessor;
|
||||
|
||||
use Closure;
|
||||
use OpenTelemetry\Context\Context;
|
||||
use OpenTelemetry\SDK\Behavior\LogsMessagesTrait;
|
||||
use OpenTelemetry\SDK\Common\Future\CancellationInterface;
|
||||
|
@ -11,56 +12,104 @@ use OpenTelemetry\SDK\Trace\ReadableSpanInterface;
|
|||
use OpenTelemetry\SDK\Trace\ReadWriteSpanInterface;
|
||||
use OpenTelemetry\SDK\Trace\SpanExporterInterface;
|
||||
use OpenTelemetry\SDK\Trace\SpanProcessorInterface;
|
||||
use SplQueue;
|
||||
use function sprintf;
|
||||
use Throwable;
|
||||
|
||||
class SimpleSpanProcessor implements SpanProcessorInterface
|
||||
{
|
||||
use LogsMessagesTrait;
|
||||
|
||||
private ?SpanExporterInterface $exporter;
|
||||
private bool $running = true;
|
||||
private SpanExporterInterface $exporter;
|
||||
|
||||
public function __construct(SpanExporterInterface $exporter = null)
|
||||
private bool $running = false;
|
||||
/** @var SplQueue<array{Closure, string, bool}> */
|
||||
private SplQueue $queue;
|
||||
|
||||
private bool $closed = false;
|
||||
|
||||
public function __construct(SpanExporterInterface $exporter)
|
||||
{
|
||||
$this->exporter = $exporter;
|
||||
|
||||
$this->queue = new SplQueue();
|
||||
}
|
||||
|
||||
/** @inheritDoc */
|
||||
public function onStart(ReadWriteSpanInterface $span, Context $parentContext): void
|
||||
{
|
||||
}
|
||||
|
||||
/** @inheritDoc */
|
||||
public function onEnd(ReadableSpanInterface $span): void
|
||||
{
|
||||
if (!$this->running || !$span->getContext()->isSampled()) {
|
||||
if ($this->closed) {
|
||||
return;
|
||||
}
|
||||
if (!$span->getContext()->isSampled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (null !== $this->exporter) {
|
||||
$this->exporter->export([$span->toSpanData()])->await();
|
||||
}
|
||||
$spanData = $span->toSpanData();
|
||||
$this->flush(fn () => $this->exporter->export([$spanData])->await(), 'export');
|
||||
}
|
||||
|
||||
/** @inheritDoc */
|
||||
public function forceFlush(?CancellationInterface $cancellation = null): bool
|
||||
{
|
||||
return true;
|
||||
if ($this->closed) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return $this->flush(fn (): bool => $this->exporter->forceFlush($cancellation), __FUNCTION__, true);
|
||||
}
|
||||
|
||||
/** @inheritDoc */
|
||||
public function shutdown(?CancellationInterface $cancellation = null): bool
|
||||
{
|
||||
if (!$this->running) {
|
||||
return true;
|
||||
if ($this->closed) {
|
||||
return false;
|
||||
}
|
||||
|
||||
$this->running = false;
|
||||
self::logDebug('Shutting down span processor');
|
||||
$this->closed = true;
|
||||
|
||||
if (null !== $this->exporter) {
|
||||
return $this->forceFlush() && $this->exporter->shutdown();
|
||||
return $this->flush(fn (): bool => $this->exporter->shutdown($cancellation), __FUNCTION__, true);
|
||||
}
|
||||
|
||||
private function flush(Closure $task, string $taskName, bool $propagateResult = false): bool
|
||||
{
|
||||
$this->queue->enqueue([$task, $taskName, $propagateResult && !$this->running]);
|
||||
|
||||
if ($this->running) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
$success = true;
|
||||
$exception = null;
|
||||
$this->running = true;
|
||||
|
||||
try {
|
||||
while (!$this->queue->isEmpty()) {
|
||||
[$task, $taskName, $propagateResult] = $this->queue->dequeue();
|
||||
|
||||
try {
|
||||
$result = $task();
|
||||
if ($propagateResult) {
|
||||
$success = $result;
|
||||
}
|
||||
} catch (Throwable $e) {
|
||||
if ($propagateResult) {
|
||||
$exception = $e;
|
||||
|
||||
continue;
|
||||
}
|
||||
self::logError(sprintf('Unhandled %s error', $taskName), ['exception' => $e]);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
$this->running = false;
|
||||
}
|
||||
|
||||
if ($exception !== null) {
|
||||
throw $exception;
|
||||
}
|
||||
|
||||
return $success;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ use InvalidArgumentException;
|
|||
use OpenTelemetry\SDK\Common\Environment\EnvironmentVariablesTrait;
|
||||
use OpenTelemetry\SDK\Common\Environment\KnownValues as Values;
|
||||
use OpenTelemetry\SDK\Common\Environment\Variables as Env;
|
||||
use OpenTelemetry\SDK\Common\Time\ClockFactory;
|
||||
use OpenTelemetry\SDK\Trace\SpanProcessor\BatchSpanProcessor;
|
||||
use OpenTelemetry\SDK\Trace\SpanProcessor\NoopSpanProcessor;
|
||||
use OpenTelemetry\SDK\Trace\SpanProcessor\SimpleSpanProcessor;
|
||||
|
@ -18,10 +19,21 @@ class SpanProcessorFactory
|
|||
|
||||
public function fromEnvironment(?SpanExporterInterface $exporter = null): SpanProcessorInterface
|
||||
{
|
||||
if ($exporter === null) {
|
||||
return new NoopSpanProcessor();
|
||||
}
|
||||
|
||||
$name = $this->getEnumFromEnvironment(Env::OTEL_PHP_TRACES_PROCESSOR);
|
||||
switch ($name) {
|
||||
case Values::VALUE_BATCH:
|
||||
return new BatchSpanProcessor($exporter);
|
||||
return new BatchSpanProcessor(
|
||||
$exporter,
|
||||
ClockFactory::getDefault(),
|
||||
$this->getIntFromEnvironment(Env::OTEL_BSP_MAX_QUEUE_SIZE, BatchSpanProcessor::DEFAULT_MAX_QUEUE_SIZE),
|
||||
$this->getIntFromEnvironment(Env::OTEL_BSP_SCHEDULE_DELAY, BatchSpanProcessor::DEFAULT_SCHEDULE_DELAY),
|
||||
$this->getIntFromEnvironment(Env::OTEL_BSP_EXPORT_TIMEOUT, BatchSpanProcessor::DEFAULT_EXPORT_TIMEOUT),
|
||||
$this->getIntFromEnvironment(Env::OTEL_BSP_MAX_EXPORT_BATCH_SIZE, BatchSpanProcessor::DEFAULT_MAX_EXPORT_BATCH_SIZE),
|
||||
);
|
||||
case Values::VALUE_SIMPLE:
|
||||
return new SimpleSpanProcessor($exporter);
|
||||
case Values::VALUE_NOOP:
|
||||
|
|
|
@ -14,6 +14,7 @@ use OpenTelemetry\SDK\Common\Attribute\Attributes;
|
|||
use OpenTelemetry\SDK\Resource\ResourceInfo;
|
||||
use OpenTelemetry\SDK\Trace\Sampler\AlwaysOnSampler;
|
||||
use OpenTelemetry\SDK\Trace\SamplerInterface;
|
||||
use OpenTelemetry\SDK\Trace\SpanProcessor\NoopSpanProcessor;
|
||||
use OpenTelemetry\SDK\Trace\SpanProcessor\SimpleSpanProcessor;
|
||||
use OpenTelemetry\SDK\Trace\TracerProvider;
|
||||
use Psr\Http\Client\ClientInterface;
|
||||
|
@ -44,7 +45,7 @@ class OtlpBench
|
|||
|
||||
public function setUpNoExporter(): void
|
||||
{
|
||||
$processor = new SimpleSpanProcessor();
|
||||
$processor = new NoopSpanProcessor();
|
||||
$provider = new TracerProvider($processor, $this->sampler, $this->resource);
|
||||
$this->tracer = $provider->getTracer('io.opentelemetry.contrib.php');
|
||||
}
|
||||
|
|
|
@ -4,29 +4,30 @@ declare(strict_types=1);
|
|||
|
||||
namespace OpenTelemetry\Tests\Unit\SDK\Trace\SpanProcessor;
|
||||
|
||||
use AssertWell\PHPUnitGlobalState\EnvironmentVariables;
|
||||
use Exception;
|
||||
use InvalidArgumentException;
|
||||
use LogicException;
|
||||
use Mockery;
|
||||
use Mockery\Adapter\Phpunit\MockeryTestCase;
|
||||
use OpenTelemetry\API\Trace as API;
|
||||
use OpenTelemetry\Context\Context;
|
||||
use OpenTelemetry\SDK\Common\Future\CompletedFuture;
|
||||
use OpenTelemetry\SDK\Common\Log\LoggerHolder;
|
||||
use OpenTelemetry\SDK\Common\Time\ClockFactory;
|
||||
use OpenTelemetry\SDK\Common\Time\ClockInterface;
|
||||
use OpenTelemetry\SDK\Trace\ReadWriteSpanInterface;
|
||||
use OpenTelemetry\SDK\Trace\SpanDataInterface;
|
||||
use OpenTelemetry\SDK\Trace\SpanExporterInterface;
|
||||
use OpenTelemetry\SDK\Trace\SpanProcessor\BatchSpanProcessor;
|
||||
use OpenTelemetry\SDK\Trace\SpanProcessorInterface;
|
||||
use OpenTelemetry\Tests\Unit\SDK\Util\TestClock;
|
||||
use ReflectionObject;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Psr\Log\LogLevel;
|
||||
|
||||
/**
|
||||
* @covers OpenTelemetry\SDK\Trace\SpanProcessor\BatchSpanProcessor
|
||||
* @covers \OpenTelemetry\SDK\Trace\SpanProcessor\BatchSpanProcessor
|
||||
*/
|
||||
class BatchSpanProcessorTest extends MockeryTestCase
|
||||
{
|
||||
use EnvironmentVariables;
|
||||
|
||||
private TestClock $testClock;
|
||||
|
||||
protected function setUp(): void
|
||||
|
@ -39,18 +40,6 @@ class BatchSpanProcessorTest extends MockeryTestCase
|
|||
protected function tearDown(): void
|
||||
{
|
||||
ClockFactory::setDefault(null);
|
||||
$this->restoreEnvironmentVariables();
|
||||
}
|
||||
|
||||
public function test_allows_null_exporter(): void
|
||||
{
|
||||
$proc = new BatchSpanProcessor(null, $this->testClock);
|
||||
$span = $this->createSampledSpanMock();
|
||||
$proc->onStart($span, Context::getCurrent());
|
||||
$proc->onEnd($span);
|
||||
$proc->forceFlush();
|
||||
$proc->shutdown();
|
||||
$this->assertTrue(true); // phpunit requires an assertion
|
||||
}
|
||||
|
||||
public function test_export_batch_size_met(): void
|
||||
|
@ -83,27 +72,6 @@ class BatchSpanProcessorTest extends MockeryTestCase
|
|||
}
|
||||
}
|
||||
|
||||
public function test_export_batch_size_greater_than_queue_size_is_rejected(): void
|
||||
{
|
||||
$batchSize = 3;
|
||||
$queueSize = 2; // queue is smaller than batch
|
||||
$exportDelay = 3;
|
||||
$timeout = 3000;
|
||||
|
||||
$exporter = $this->createMock(SpanExporterInterface::class);
|
||||
|
||||
$this->expectException(\InvalidArgumentException::class);
|
||||
/** @var SpanExporterInterface $exporter */
|
||||
$processor = new BatchSpanProcessor(
|
||||
$exporter,
|
||||
$this->testClock,
|
||||
$queueSize,
|
||||
$exportDelay,
|
||||
$timeout,
|
||||
$batchSize
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* @dataProvider scheduledDelayProvider
|
||||
*/
|
||||
|
@ -119,7 +87,7 @@ class BatchSpanProcessorTest extends MockeryTestCase
|
|||
}
|
||||
|
||||
$exporter = $this->createMock(SpanExporterInterface::class);
|
||||
$exporter->expects($this->exactly($expectedFlush ? 1 : 0))->method('forceFlush');
|
||||
$exporter->expects($this->exactly($expectedFlush ? 1 : 0))->method('export');
|
||||
|
||||
/** @var SpanExporterInterface $exporter */
|
||||
$processor = new BatchSpanProcessor(
|
||||
|
@ -162,7 +130,6 @@ class BatchSpanProcessorTest extends MockeryTestCase
|
|||
}
|
||||
|
||||
$exporter = Mockery::mock(SpanExporterInterface::class);
|
||||
$exporter->expects('forceFlush');
|
||||
$exporter
|
||||
->expects('export')
|
||||
->with(
|
||||
|
@ -328,6 +295,57 @@ class BatchSpanProcessorTest extends MockeryTestCase
|
|||
$processor->forceFlush();
|
||||
}
|
||||
|
||||
public function test_queue_size_exceeded_drops_spans(): void
|
||||
{
|
||||
$exporter = $this->createMock(SpanExporterInterface::class);
|
||||
$processor = new BatchSpanProcessor($exporter, $this->testClock, 5, 5000, 30000, 5);
|
||||
|
||||
$exporter->expects($this->exactly(2))->method('export')->willReturnCallback(function (iterable $batch) use ($processor, &$i) {
|
||||
if ($i) {
|
||||
$this->assertCount(3, $batch);
|
||||
} else {
|
||||
for ($i = 0; $i < 5; $i++) {
|
||||
$span = $this->createSampledSpanMock();
|
||||
$processor->onStart($span, Context::getCurrent());
|
||||
$processor->onEnd($span);
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
});
|
||||
|
||||
$span = $this->createSampledSpanMock();
|
||||
$processor->onStart($span, Context::getCurrent());
|
||||
$processor->onEnd($span);
|
||||
$processor->onStart($span, Context::getCurrent());
|
||||
$processor->onEnd($span);
|
||||
|
||||
$processor->forceFlush();
|
||||
$processor->forceFlush();
|
||||
}
|
||||
|
||||
public function test_force_flush_applies_only_to_current_spans(): void
|
||||
{
|
||||
$exporter = $this->createMock(SpanExporterInterface::class);
|
||||
$processor = new BatchSpanProcessor($exporter, $this->testClock);
|
||||
|
||||
$exporter->expects($this->exactly(1))->method('export')->willReturnCallback(function (iterable $batch) use ($processor) {
|
||||
$this->assertCount(1, $batch);
|
||||
|
||||
$span = $this->createSampledSpanMock();
|
||||
$processor->onStart($span, Context::getCurrent());
|
||||
$processor->onEnd($span);
|
||||
|
||||
return 0;
|
||||
});
|
||||
|
||||
$span = $this->createSampledSpanMock();
|
||||
$processor->onStart($span, Context::getCurrent());
|
||||
$processor->onEnd($span);
|
||||
|
||||
$processor->forceFlush();
|
||||
}
|
||||
|
||||
public function test_shutdown_shutdowns_exporter(): void
|
||||
{
|
||||
$exporter = $this->createMock(SpanExporterInterface::class);
|
||||
|
@ -337,34 +355,136 @@ class BatchSpanProcessorTest extends MockeryTestCase
|
|||
$processor->shutdown();
|
||||
}
|
||||
|
||||
public function test_create_from_environment_variables(): void
|
||||
public function test_throwing_exporter_export(): void
|
||||
{
|
||||
$exporter = $this->createMock(SpanExporterInterface::class);
|
||||
$exporter->method('forceFlush')->willReturn(true);
|
||||
$exporter->method('export')->willThrowException(new LogicException());
|
||||
|
||||
$input = [
|
||||
['OTEL_BSP_MAX_EXPORT_BATCH_SIZE', 'maxExportBatchSize', 1],
|
||||
['OTEL_BSP_MAX_QUEUE_SIZE', 'maxQueueSize', 2],
|
||||
['OTEL_BSP_SCHEDULE_DELAY', 'scheduledDelayMillis', 3],
|
||||
['OTEL_BSP_EXPORT_TIMEOUT', 'exporterTimeoutMillis', 4],
|
||||
];
|
||||
foreach ($input as $i) {
|
||||
$this->setEnvironmentVariable($i[0], $i[2]);
|
||||
}
|
||||
$processor = new BatchSpanProcessor($exporter);
|
||||
$reflection = new ReflectionObject($processor);
|
||||
foreach ($input as $i) {
|
||||
$attr = $reflection->getProperty($i[1]);
|
||||
$attr->setAccessible(true);
|
||||
$this->assertEquals($i[2], $attr->getValue($processor));
|
||||
$logger = $this->createMock(LoggerInterface::class);
|
||||
$logger->expects($this->once())->method('log')->with(LogLevel::ERROR);
|
||||
|
||||
$processor = new BatchSpanProcessor($exporter, $this->testClock);
|
||||
|
||||
$span = $this->createSampledSpanMock();
|
||||
$processor->onStart($span, Context::getCurrent());
|
||||
$processor->onEnd($span);
|
||||
|
||||
$previousLogger = LoggerHolder::get();
|
||||
LoggerHolder::set($logger);
|
||||
|
||||
try {
|
||||
$processor->forceFlush();
|
||||
} finally {
|
||||
LoggerHolder::set($previousLogger);
|
||||
}
|
||||
}
|
||||
|
||||
public function test_create_non_numeric_environment_value_throws_exception(): void
|
||||
public function test_throwing_exporter_flush(): void
|
||||
{
|
||||
$this->setEnvironmentVariable('OTEL_BSP_MAX_QUEUE_SIZE', 'fruit');
|
||||
$exporter = $this->createMock(SpanExporterInterface::class);
|
||||
$this->expectException(Exception::class);
|
||||
new BatchSpanProcessor($exporter);
|
||||
$exporter->method('forceFlush')->willThrowException(new LogicException());
|
||||
|
||||
$this->expectException(LogicException::class);
|
||||
|
||||
$processor = new BatchSpanProcessor($exporter, $this->testClock);
|
||||
$span = $this->createSampledSpanMock();
|
||||
$processor->onStart($span, Context::getCurrent());
|
||||
$processor->onEnd($span);
|
||||
|
||||
$processor->forceFlush();
|
||||
}
|
||||
|
||||
public function test_throwing_exporter_flush_cannot_rethrow_in_original_caller_logs_error(): void
|
||||
{
|
||||
$exporter = $this->createMock(SpanExporterInterface::class);
|
||||
$exporter->method('forceFlush')->willReturnCallback(function () use (&$processor) {
|
||||
/** @var SpanProcessorInterface $processor */
|
||||
$span = $this->createSampledSpanMock();
|
||||
$processor->onStart($span, Context::getCurrent());
|
||||
$processor->onEnd($span);
|
||||
|
||||
return $processor->shutdown();
|
||||
});
|
||||
$exporter->method('shutdown')->willThrowException(new LogicException());
|
||||
|
||||
$logger = $this->createMock(LoggerInterface::class);
|
||||
$logger->expects($this->once())->method('log')->with(LogLevel::ERROR);
|
||||
|
||||
$processor = new BatchSpanProcessor($exporter, $this->testClock);
|
||||
|
||||
$span = $this->createSampledSpanMock();
|
||||
$processor->onStart($span, Context::getCurrent());
|
||||
$processor->onEnd($span);
|
||||
|
||||
$previousLogger = LoggerHolder::get();
|
||||
LoggerHolder::set($logger);
|
||||
|
||||
try {
|
||||
$processor->forceFlush();
|
||||
} finally {
|
||||
LoggerHolder::set($previousLogger);
|
||||
}
|
||||
}
|
||||
|
||||
public function test_throwing_exporter_flush_rethrows_in_original_caller(): void
|
||||
{
|
||||
$exporter = $this->createMock(SpanExporterInterface::class);
|
||||
$exporter->method('forceFlush')->willReturnCallback(function () use (&$processor) {
|
||||
/** @var SpanProcessorInterface $processor */
|
||||
$span = $this->createSampledSpanMock();
|
||||
$processor->onStart($span, Context::getCurrent());
|
||||
$processor->onEnd($span);
|
||||
$processor->shutdown();
|
||||
|
||||
throw new LogicException();
|
||||
});
|
||||
$exporter->expects($this->once())->method('shutdown');
|
||||
|
||||
$this->expectException(LogicException::class);
|
||||
|
||||
$processor = new BatchSpanProcessor($exporter, $this->testClock);
|
||||
|
||||
$span = $this->createSampledSpanMock();
|
||||
$processor->onStart($span, Context::getCurrent());
|
||||
$processor->onEnd($span);
|
||||
|
||||
$processor->forceFlush();
|
||||
}
|
||||
|
||||
public function test_span_processor_throws_on_invalid_max_queue_size(): void
|
||||
{
|
||||
$this->expectException(InvalidArgumentException::class);
|
||||
$exporter = $this->createMock(SpanExporterInterface::class);
|
||||
new BatchSpanProcessor($exporter, $this->testClock, -1);
|
||||
}
|
||||
|
||||
public function test_span_processor_throws_on_invalid_scheduled_delay(): void
|
||||
{
|
||||
$this->expectException(InvalidArgumentException::class);
|
||||
$exporter = $this->createMock(SpanExporterInterface::class);
|
||||
new BatchSpanProcessor($exporter, $this->testClock, 2048, -1);
|
||||
}
|
||||
|
||||
public function test_span_processor_throws_on_invalid_export_timeout(): void
|
||||
{
|
||||
$this->expectException(InvalidArgumentException::class);
|
||||
$exporter = $this->createMock(SpanExporterInterface::class);
|
||||
new BatchSpanProcessor($exporter, $this->testClock, 2048, 5000, -1);
|
||||
}
|
||||
|
||||
public function test_span_processor_throws_on_invalid_max_export_batch_size(): void
|
||||
{
|
||||
$this->expectException(InvalidArgumentException::class);
|
||||
$exporter = $this->createMock(SpanExporterInterface::class);
|
||||
new BatchSpanProcessor($exporter, $this->testClock, 2048, 5000, 30000, -1);
|
||||
}
|
||||
|
||||
public function test_span_processor_throws_on_invalid_max_export_batch_size_exceeding_max_queue_size(): void
|
||||
{
|
||||
$this->expectException(InvalidArgumentException::class);
|
||||
$exporter = $this->createMock(SpanExporterInterface::class);
|
||||
new BatchSpanProcessor($exporter, $this->testClock, 2, 5000, 30000, 3);
|
||||
}
|
||||
|
||||
private function createSampledSpanMock()
|
||||
|
|
|
@ -4,6 +4,7 @@ declare(strict_types=1);
|
|||
|
||||
namespace OpenTelemetry\Tests\Unit\SDK\Trace\SpanProcessor;
|
||||
|
||||
use LogicException;
|
||||
use Mockery;
|
||||
use Mockery\Adapter\Phpunit\MockeryTestCase;
|
||||
use Mockery\MockInterface;
|
||||
|
@ -11,11 +12,14 @@ use OpenTelemetry\API\Trace\SpanContext;
|
|||
use OpenTelemetry\API\Trace\SpanContextInterface;
|
||||
use OpenTelemetry\Context\Context;
|
||||
use OpenTelemetry\SDK\Common\Future\CompletedFuture;
|
||||
use OpenTelemetry\SDK\Common\Log\LoggerHolder;
|
||||
use OpenTelemetry\SDK\Trace\ReadableSpanInterface;
|
||||
use OpenTelemetry\SDK\Trace\ReadWriteSpanInterface;
|
||||
use OpenTelemetry\SDK\Trace\SpanExporterInterface;
|
||||
use OpenTelemetry\SDK\Trace\SpanProcessor\SimpleSpanProcessor;
|
||||
use OpenTelemetry\Tests\Unit\SDK\Util\SpanData;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Psr\Log\LogLevel;
|
||||
|
||||
/**
|
||||
* @covers \OpenTelemetry\SDK\Trace\SpanProcessor\SimpleSpanProcessor
|
||||
|
@ -59,6 +63,14 @@ class SimpleSpanProcessorTest extends MockeryTestCase
|
|||
$this->spanExporter->shouldNotReceive('export');
|
||||
}
|
||||
|
||||
public function test_on_end_after_shutdown(): void
|
||||
{
|
||||
$this->spanExporter->shouldReceive('shutdown');
|
||||
$this->spanExporter->shouldNotReceive('export');
|
||||
$this->simpleSpanProcessor->shutdown();
|
||||
$this->simpleSpanProcessor->onEnd($this->readableSpan);
|
||||
}
|
||||
|
||||
public function test_on_end_sampled_span(): void
|
||||
{
|
||||
$spanData = new SpanData();
|
||||
|
@ -76,24 +88,85 @@ class SimpleSpanProcessorTest extends MockeryTestCase
|
|||
$this->simpleSpanProcessor->onEnd($this->readableSpan);
|
||||
}
|
||||
|
||||
public function test_does_not_trigger_concurrent_export(): void
|
||||
{
|
||||
$spanData = new SpanData();
|
||||
$count = 3;
|
||||
$this->readableSpan->expects('getContext')->times($count)->andReturn($this->sampledSpanContext);
|
||||
$this->readableSpan->expects('toSpanData')->times($count)->andReturn($spanData);
|
||||
|
||||
$this->spanExporter->expects('export')->times($count)->andReturnUsing(function () use (&$running, &$count) {
|
||||
$this->assertNotTrue($running);
|
||||
$running = true;
|
||||
if (--$count) {
|
||||
$this->simpleSpanProcessor->onEnd($this->readableSpan);
|
||||
}
|
||||
$running = false;
|
||||
|
||||
return 0;
|
||||
});
|
||||
|
||||
$this->simpleSpanProcessor->onEnd($this->readableSpan);
|
||||
}
|
||||
|
||||
// TODO: Add test to ensure exporter is retried on failure.
|
||||
|
||||
public function test_force_flush(): void
|
||||
{
|
||||
$this->spanExporter->expects('forceFlush')->andReturn(true);
|
||||
$this->assertTrue($this->simpleSpanProcessor->forceFlush());
|
||||
}
|
||||
|
||||
public function test_force_flush_after_shutdown(): void
|
||||
{
|
||||
$this->spanExporter->expects('shutdown')->andReturn(true);
|
||||
$this->spanExporter->shouldNotReceive('forceFlush');
|
||||
$this->simpleSpanProcessor->shutdown();
|
||||
$this->simpleSpanProcessor->forceFlush();
|
||||
}
|
||||
|
||||
public function test_shutdown(): void
|
||||
{
|
||||
$this->spanExporter->expects('shutdown')->andReturnTrue();
|
||||
|
||||
$this->assertTrue($this->simpleSpanProcessor->shutdown());
|
||||
$this->assertTrue($this->simpleSpanProcessor->shutdown());
|
||||
$this->assertFalse($this->simpleSpanProcessor->shutdown());
|
||||
}
|
||||
|
||||
public function test_shutdown_with_no_exporter(): void
|
||||
public function test_throwing_exporter_export(): void
|
||||
{
|
||||
$processor = new SimpleSpanProcessor(null);
|
||||
$this->assertTrue($processor->shutdown());
|
||||
$exporter = $this->createMock(SpanExporterInterface::class);
|
||||
$exporter->method('forceFlush')->willReturn(true);
|
||||
$exporter->method('export')->willThrowException(new LogicException());
|
||||
|
||||
$logger = $this->createMock(LoggerInterface::class);
|
||||
$logger->expects($this->once())->method('log')->with(LogLevel::ERROR);
|
||||
|
||||
$processor = new SimpleSpanProcessor($exporter);
|
||||
|
||||
$this->readableSpan->expects('getContext')->andReturn($this->sampledSpanContext);
|
||||
$this->readableSpan->expects('toSpanData')->andReturn(new SpanData());
|
||||
|
||||
$previousLogger = LoggerHolder::get();
|
||||
LoggerHolder::set($logger);
|
||||
|
||||
try {
|
||||
$processor->onStart($this->readWriteSpan, Context::getCurrent());
|
||||
$processor->onEnd($this->readableSpan);
|
||||
} finally {
|
||||
LoggerHolder::set($previousLogger);
|
||||
}
|
||||
}
|
||||
|
||||
public function test_throwing_exporter_flush(): void
|
||||
{
|
||||
$exporter = $this->createMock(SpanExporterInterface::class);
|
||||
$exporter->method('forceFlush')->willThrowException(new LogicException());
|
||||
|
||||
$this->expectException(LogicException::class);
|
||||
|
||||
$processor = new SimpleSpanProcessor($exporter);
|
||||
|
||||
$processor->forceFlush();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@ class SpanProcessorFactoryTest extends TestCase
|
|||
{
|
||||
$this->setEnvironmentVariable('OTEL_PHP_TRACES_PROCESSOR', $processorName);
|
||||
$factory = new SpanProcessorFactory();
|
||||
$this->assertInstanceOf($expected, $factory->fromEnvironment());
|
||||
$this->assertInstanceOf($expected, $factory->fromEnvironment($this->createMock(SpanExporterInterface::class)));
|
||||
}
|
||||
|
||||
public function processorProvider()
|
||||
|
|
Loading…
Reference in New Issue