Always weaken asynchronous callbacks (#1081)
* Always weaken asynchronous callbacks * Update comment regarding weak callbacks
This commit is contained in:
parent
cf8be76463
commit
926f6e5583
|
|
@ -12,9 +12,10 @@ use OpenTelemetry\SDK\Resource\ResourceInfoFactory;
|
|||
require 'vendor/autoload.php';
|
||||
|
||||
/**
|
||||
* Example of using `weaken` on observer callbacks. Creates a weak reference to the
|
||||
* callback, so that internal references to the callback do not stop garbage collection on
|
||||
* the original object
|
||||
* Example of using weakly referenced observer callbacks. Binds the lifetime of
|
||||
* a callback to its bound object, the returned `ObserverCallbackInterface` is
|
||||
* ignored to automatically detach the callback once the original object is
|
||||
* garbage collected.
|
||||
*/
|
||||
|
||||
$reader = new ExportingReader(
|
||||
|
|
@ -36,7 +37,7 @@ $callback = new class() {
|
|||
$meterProvider
|
||||
->getMeter('demo_meter')
|
||||
->createObservableGauge('number', 'items', 'Random number')
|
||||
->observe($callback, true); //weak-ref to callback
|
||||
->observe($callback); //weak-ref to callback
|
||||
|
||||
$reader->collect(); //metrics (data-points) collected (callback invoked)
|
||||
unset($callback);
|
||||
|
|
|
|||
|
|
@ -4,6 +4,48 @@ declare(strict_types=1);
|
|||
|
||||
namespace OpenTelemetry\API\Metrics;
|
||||
|
||||
/**
|
||||
* An observed callback.
|
||||
*
|
||||
* Callbacks that are bound to an object are automatically detached when the
|
||||
* `ObservableCallbackInterface` and the bound object are out of scope.
|
||||
* This means that the `ObservableCallbackInterface` can be ignored if the
|
||||
* observed callback should be bound to the lifetime of the object.
|
||||
* ```php
|
||||
* class Example {
|
||||
* function __construct(MeterProviderInterface $meterProvider) {
|
||||
* $meterProvider->getMeter('example')
|
||||
* ->createObservableGauge('random')
|
||||
* ->observe(fn(ObserverInterface $observer)
|
||||
* => $observer->observe(rand(0, 10)));
|
||||
* }
|
||||
* }
|
||||
* ```
|
||||
* Keeping a reference to the `ObservableCallbackInterface` within the bound
|
||||
* object to gain a more fine-grained control over the life-time of the callback
|
||||
* does not prevent garbage collection (but might require cycle collection).
|
||||
*
|
||||
* Unbound (static) callbacks must be detached manually using
|
||||
* {@link ObservableCallbackInterface::detach()}.
|
||||
* ```php
|
||||
* class Example {
|
||||
* private ObservableCallbackInterface $gauge;
|
||||
* function __construct(MeterProviderInterface $meterProvider) {
|
||||
* $this->gauge = $meterProvider->getMeter('example')
|
||||
* ->createObservableGauge('random')
|
||||
* ->observe(static fn(ObserverInterface $observer)
|
||||
* => $observer->observe(rand(0, 10)));
|
||||
* }
|
||||
* function __destruct() {
|
||||
* $this->gauge->detach();
|
||||
* }
|
||||
* }
|
||||
* ```
|
||||
*
|
||||
* @see ObservableCounterInterface::observe()
|
||||
* @see ObservableGaugeInterface::observe()
|
||||
* @see ObservableUpDownCounterInterface::observe()
|
||||
*/
|
||||
interface ObservableCallbackInterface
|
||||
{
|
||||
|
||||
|
|
|
|||
|
|
@ -10,9 +10,7 @@ interface ObservableCounterInterface
|
|||
/**
|
||||
* @param callable(ObserverInterface): void $callback function responsible for
|
||||
* reporting the measurements (as absolute values)
|
||||
* @param bool $weaken Create a weak reference to the callback so that it
|
||||
* does not stop garbage collection
|
||||
* @return ObservableCallbackInterface token to detach callback
|
||||
*/
|
||||
public function observe(callable $callback, bool $weaken = false): ObservableCallbackInterface;
|
||||
public function observe(callable $callback): ObservableCallbackInterface;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,9 +10,7 @@ interface ObservableGaugeInterface
|
|||
/**
|
||||
* @param callable(ObserverInterface): void $callback function responsible for
|
||||
* reporting the measurements
|
||||
* @param bool $weaken Create a weak reference to the callback so that it
|
||||
* does not stop garbage collection
|
||||
* @return ObservableCallbackInterface token to detach callback
|
||||
*/
|
||||
public function observe(callable $callback, bool $weaken = false): ObservableCallbackInterface;
|
||||
public function observe(callable $callback): ObservableCallbackInterface;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,9 +10,7 @@ interface ObservableUpDownCounterInterface
|
|||
/**
|
||||
* @param callable(ObserverInterface): void $callback function responsible for
|
||||
* reporting the measurements (as absolute values)
|
||||
* @param bool $weaken Create a weak reference to the callback so that it
|
||||
* does not stop garbage collection
|
||||
* @return ObservableCallbackInterface token to detach callback
|
||||
*/
|
||||
public function observe(callable $callback, bool $weaken = false): ObservableCallbackInterface;
|
||||
public function observe(callable $callback): ObservableCallbackInterface;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -115,7 +115,7 @@ class BatchLogRecordProcessor implements LogRecordProcessorInterface
|
|||
$observer->observe($pending, self::ATTRIBUTES_PENDING);
|
||||
$observer->observe($processed, self::ATTRIBUTES_PROCESSED);
|
||||
$observer->observe($dropped, self::ATTRIBUTES_DROPPED);
|
||||
}, true);
|
||||
});
|
||||
$meter
|
||||
->createObservableUpDownCounter(
|
||||
'otel.logs.log_processor.queue.limit',
|
||||
|
|
@ -124,7 +124,7 @@ class BatchLogRecordProcessor implements LogRecordProcessorInterface
|
|||
)
|
||||
->observe(function (ObserverInterface $observer): void {
|
||||
$observer->observe($this->maxQueueSize, self::ATTRIBUTES_PROCESSOR);
|
||||
}, true);
|
||||
});
|
||||
$meter
|
||||
->createObservableUpDownCounter(
|
||||
'otel.logs.log_processor.queue.usage',
|
||||
|
|
@ -139,7 +139,7 @@ class BatchLogRecordProcessor implements LogRecordProcessorInterface
|
|||
$observer->observe($queued, self::ATTRIBUTES_QUEUED);
|
||||
$observer->observe($pending, self::ATTRIBUTES_PENDING);
|
||||
$observer->observe($free, self::ATTRIBUTES_FREE);
|
||||
}, true);
|
||||
});
|
||||
}
|
||||
|
||||
public function onEmit(ReadWriteLogRecord $record, ?ContextInterface $context = null): void
|
||||
|
|
|
|||
|
|
@ -16,13 +16,16 @@ final class ObservableCallback implements ObservableCallbackInterface
|
|||
private ReferenceCounterInterface $referenceCounter;
|
||||
private ?int $callbackId;
|
||||
private ?ObservableCallbackDestructor $callbackDestructor;
|
||||
/** @phpstan-ignore-next-line */
|
||||
private ?object $target;
|
||||
|
||||
public function __construct(MetricWriterInterface $writer, ReferenceCounterInterface $referenceCounter, int $callbackId, ?ObservableCallbackDestructor $callbackDestructor)
|
||||
public function __construct(MetricWriterInterface $writer, ReferenceCounterInterface $referenceCounter, int $callbackId, ?ObservableCallbackDestructor $callbackDestructor, ?object $target)
|
||||
{
|
||||
$this->writer = $writer;
|
||||
$this->referenceCounter = $referenceCounter;
|
||||
$this->callbackId = $callbackId;
|
||||
$this->callbackDestructor = $callbackDestructor;
|
||||
$this->target = $target;
|
||||
}
|
||||
|
||||
public function detach(): void
|
||||
|
|
|
|||
|
|
@ -43,13 +43,9 @@ trait ObservableInstrumentTrait
|
|||
/**
|
||||
* @param callable(ObserverInterface): void $callback
|
||||
*/
|
||||
public function observe(callable $callback, bool $weaken = false): ObservableCallbackInterface
|
||||
public function observe(callable $callback): ObservableCallbackInterface
|
||||
{
|
||||
$target = null;
|
||||
$callback = closure($callback);
|
||||
if ($weaken) {
|
||||
$callback = weaken($callback, $target);
|
||||
}
|
||||
$callback = weaken(closure($callback), $target);
|
||||
|
||||
$callbackId = $this->writer->registerCallback($callback, $this->instrument);
|
||||
$this->referenceCounter->acquire();
|
||||
|
|
@ -60,6 +56,6 @@ trait ObservableInstrumentTrait
|
|||
$destructor->callbackIds[$callbackId] = $callbackId;
|
||||
}
|
||||
|
||||
return new ObservableCallback($this->writer, $this->referenceCounter, $callbackId, $destructor);
|
||||
return new ObservableCallback($this->writer, $this->referenceCounter, $callbackId, $destructor, $target);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -120,7 +120,7 @@ class BatchSpanProcessor implements SpanProcessorInterface
|
|||
$observer->observe($pending, self::ATTRIBUTES_PENDING);
|
||||
$observer->observe($processed, self::ATTRIBUTES_PROCESSED);
|
||||
$observer->observe($dropped, self::ATTRIBUTES_DROPPED);
|
||||
}, true);
|
||||
});
|
||||
$meter
|
||||
->createObservableUpDownCounter(
|
||||
'otel.trace.span_processor.queue.limit',
|
||||
|
|
@ -129,7 +129,7 @@ class BatchSpanProcessor implements SpanProcessorInterface
|
|||
)
|
||||
->observe(function (ObserverInterface $observer): void {
|
||||
$observer->observe($this->maxQueueSize, self::ATTRIBUTES_PROCESSOR);
|
||||
}, true);
|
||||
});
|
||||
$meter
|
||||
->createObservableUpDownCounter(
|
||||
'otel.trace.span_processor.queue.usage',
|
||||
|
|
@ -144,7 +144,7 @@ class BatchSpanProcessor implements SpanProcessorInterface
|
|||
$observer->observe($queued, self::ATTRIBUTES_QUEUED);
|
||||
$observer->observe($pending, self::ATTRIBUTES_PENDING);
|
||||
$observer->observe($free, self::ATTRIBUTES_FREE);
|
||||
}, true);
|
||||
});
|
||||
}
|
||||
|
||||
public function onStart(ReadWriteSpanInterface $span, ContextInterface $parentContext): void
|
||||
|
|
|
|||
|
|
@ -56,7 +56,7 @@ final class MeterProviderTest extends TestCase
|
|||
$meterProvider
|
||||
->getMeter('test')
|
||||
->createObservableUpDownCounter('test')
|
||||
->observe(fn (ObserverInterface $observer) => $observer->observe($this->count()), true);
|
||||
->observe(fn (ObserverInterface $observer) => $observer->observe($this->count()));
|
||||
}
|
||||
public function count(): int
|
||||
{
|
||||
|
|
|
|||
|
|
@ -116,7 +116,7 @@ final class InstrumentTest extends TestCase
|
|||
};
|
||||
|
||||
$c = new ObservableCounter($w, $i, new NoopStalenessHandler(), WeakMap::create());
|
||||
$c->observe($instance, true);
|
||||
$c->observe($instance);
|
||||
$instance = null;
|
||||
|
||||
$w->collectAndPush([$n]);
|
||||
|
|
@ -209,7 +209,7 @@ final class InstrumentTest extends TestCase
|
|||
$referenceCounter = $this->createMock(ReferenceCounterInterface::class);
|
||||
$referenceCounter->expects($this->once())->method('release');
|
||||
|
||||
$callback = new ObservableCallback($writer, $referenceCounter, 1, null);
|
||||
$callback = new ObservableCallback($writer, $referenceCounter, 1, null, null);
|
||||
$callback->detach();
|
||||
}
|
||||
|
||||
|
|
@ -224,7 +224,7 @@ final class InstrumentTest extends TestCase
|
|||
$callbackDestructor = new ObservableCallbackDestructor($writer, $referenceCounter);
|
||||
$callbackDestructor->callbackIds[1] = 1;
|
||||
|
||||
$callback = new ObservableCallback($writer, $referenceCounter, 1, $callbackDestructor);
|
||||
$callback = new ObservableCallback($writer, $referenceCounter, 1, $callbackDestructor, null);
|
||||
$callback->detach();
|
||||
|
||||
$this->assertArrayNotHasKey(1, $callbackDestructor->callbackIds);
|
||||
|
|
@ -241,7 +241,7 @@ final class InstrumentTest extends TestCase
|
|||
$referenceCounter->expects($this->once())->method('release');
|
||||
|
||||
/** @noinspection PhpExpressionResultUnusedInspection */
|
||||
new ObservableCallback($writer, $referenceCounter, 1, null);
|
||||
new ObservableCallback($writer, $referenceCounter, 1, null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -257,6 +257,6 @@ final class InstrumentTest extends TestCase
|
|||
$callbackDestructor->callbackIds[1] = 1;
|
||||
|
||||
/** @noinspection PhpExpressionResultUnusedInspection */
|
||||
new ObservableCallback($writer, $referenceCounter, 1, $callbackDestructor);
|
||||
new ObservableCallback($writer, $referenceCounter, 1, $callbackDestructor, null);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue