Allow multiple async callbacks, allow callbacks to be removed (#4143)

* Allow multiple async callbacks, allow callbacks to be removed

* Use AutoCloseable to remove callbacks, don't unregister from MetricStorageRegistry

* Use CopyOnWriteArrayList

* PR feedback

* PR feedback
This commit is contained in:
jack-berg 2022-02-25 11:51:51 -06:00 committed by GitHub
parent c053393c32
commit 9f10e0048e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 739 additions and 297 deletions

View File

@ -10,8 +10,16 @@ import java.util.function.Consumer;
/**
* A reference to an observable metric registered with {@link
* DoubleCounterBuilder#buildWithCallback(Consumer)}.
*
* <p>This interface currently has no methods but may be extended in the future with functionality
* such as canceling the observable.
*/
public interface ObservableDoubleCounter {}
public interface ObservableDoubleCounter extends AutoCloseable {
/**
* Remove the callback registered via {@link DoubleCounterBuilder#buildWithCallback(Consumer)}.
* After this is called, the callback won't be invoked on future collections. Subsequent calls to
* {@link #close()} have no effect.
*
* <p>Note: other callbacks registered to the metric with the same identity are unaffected.
*/
@Override
default void close() {}
}

View File

@ -10,8 +10,16 @@ import java.util.function.Consumer;
/**
* A reference to an observable metric registered with {@link
* DoubleGaugeBuilder#buildWithCallback(Consumer)}.
*
* <p>This interface currently has no methods but may be extended in the future with functionality
* such as canceling the observable.
*/
public interface ObservableDoubleGauge {}
public interface ObservableDoubleGauge extends AutoCloseable {
/**
* Remove the callback registered via {@link DoubleGaugeBuilder#buildWithCallback(Consumer)}.
* After this is called, the callback won't be invoked on future collections. Subsequent calls to
* {@link #close()} have no effect.
*
* <p>Note: other callbacks registered to the metric with the same identity are unaffected.
*/
@Override
default void close() {}
}

View File

@ -10,8 +10,16 @@ import java.util.function.Consumer;
/**
* A reference to an observable metric registered with {@link
* DoubleUpDownCounterBuilder#buildWithCallback(Consumer)}.
*
* <p>This interface currently has no methods but may be extended in the future with functionality
* such as canceling the observable.
*/
public interface ObservableDoubleUpDownCounter {}
public interface ObservableDoubleUpDownCounter extends AutoCloseable {
/**
* Remove the callback registered via {@link
* DoubleUpDownCounterBuilder#buildWithCallback(Consumer)}. After this is called, the callback
* won't be invoked on future collections. Subsequent calls to {@link #close()} have no effect.
*
* <p>Note: other callbacks registered to the metric with the same identity are unaffected.
*/
@Override
default void close() {}
}

View File

@ -10,8 +10,16 @@ import java.util.function.Consumer;
/**
* A reference to an observable metric registered with {@link
* LongCounterBuilder#buildWithCallback(Consumer)}.
*
* <p>This interface currently has no methods but may be extended in the future with functionality
* such as canceling the observable.
*/
public interface ObservableLongCounter {}
public interface ObservableLongCounter extends AutoCloseable {
/**
* Remove the callback registered via {@link LongCounterBuilder#buildWithCallback(Consumer)}.
* After this is called, the callback won't be invoked on future collections. Subsequent calls to
* {@link #close()} have no effect.
*
* <p>Note: other callbacks registered to the metric with the same identity are unaffected.
*/
@Override
default void close() {}
}

View File

@ -10,8 +10,16 @@ import java.util.function.Consumer;
/**
* A reference to an observable metric registered with {@link
* LongGaugeBuilder#buildWithCallback(Consumer)}.
*
* <p>This interface currently has no methods but may be extended in the future with functionality
* such as canceling the observable.
*/
public interface ObservableLongGauge {}
public interface ObservableLongGauge extends AutoCloseable {
/**
* Remove the callback registered via {@link LongGaugeBuilder#buildWithCallback(Consumer)}. After
* this is called, the callback won't be invoked on future collections. Subsequent calls to {@link
* #close()} have no effect.
*
* <p>Note: other callbacks registered to the metric with the same identity are unaffected.
*/
@Override
default void close() {}
}

View File

@ -10,8 +10,16 @@ import java.util.function.Consumer;
/**
* A reference to an observable metric registered with {@link
* LongUpDownCounterBuilder#buildWithCallback(Consumer)}.
*
* <p>This interface currently has no methods but may be extended in the future with functionality
* such as canceling the observable.
*/
public interface ObservableLongUpDownCounter {}
public interface ObservableLongUpDownCounter extends AutoCloseable {
/**
* Remove the callback registered via {@link
* LongUpDownCounterBuilder#buildWithCallback(Consumer)}. After this is called, the callback won't
* be invoked on future collections. Subsequent calls to {@link #close()} have no effect.
*
* <p>Note: other callbacks registered to the metric with the same identity are unaffected.
*/
@Override
default void close() {}
}

View File

@ -2,3 +2,27 @@ Comparing source compatibility of against
***! MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.common.AttributesBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++! NEW METHOD: PUBLIC(+) io.opentelemetry.api.common.AttributesBuilder put(io.opentelemetry.api.common.AttributeKey, java.lang.Object[])
***! MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.ObservableDoubleCounter (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW INTERFACE: java.lang.AutoCloseable
+++! NEW METHOD: PUBLIC(+) void close()
***! MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.ObservableDoubleGauge (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW INTERFACE: java.lang.AutoCloseable
+++! NEW METHOD: PUBLIC(+) void close()
***! MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.ObservableDoubleUpDownCounter (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW INTERFACE: java.lang.AutoCloseable
+++! NEW METHOD: PUBLIC(+) void close()
***! MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.ObservableLongCounter (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW INTERFACE: java.lang.AutoCloseable
+++! NEW METHOD: PUBLIC(+) void close()
***! MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.ObservableLongGauge (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW INTERFACE: java.lang.AutoCloseable
+++! NEW METHOD: PUBLIC(+) void close()
***! MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.ObservableLongUpDownCounter (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW INTERFACE: java.lang.AutoCloseable
+++! NEW METHOD: PUBLIC(+) void close()

View File

@ -51,7 +51,6 @@ class TestSourceInfo {
.contains("- Description [description2] does not match [description]")
.contains("- InstrumentType [COUNTER] does not match [OBSERVABLE_COUNTER]")
.contains("- InstrumentValueType [LONG] does not match [DOUBLE]")
.contains("- InstrumentType [OBSERVABLE_COUNTER] is async and already registered")
.contains(simple.getSourceInstrument().getSourceInfo().multiLineDebugString())
.contains("Original instrument registered with same name but is incompatible.")
.contains(

View File

@ -10,9 +10,11 @@ import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.internal.state.AsynchronousMetricStorage;
import io.opentelemetry.sdk.metrics.internal.state.MeterProviderSharedState;
import io.opentelemetry.sdk.metrics.internal.state.MeterSharedState;
import io.opentelemetry.sdk.metrics.internal.state.WriteableMetricStorage;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Consumer;
@ -21,10 +23,11 @@ abstract class AbstractInstrumentBuilder<BuilderT extends AbstractInstrumentBuil
private final MeterProviderSharedState meterProviderSharedState;
private final MeterSharedState meterSharedState;
private final String instrumentName;
private String description;
private String unit;
protected final String instrumentName;
AbstractInstrumentBuilder(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState,
@ -69,17 +72,19 @@ abstract class AbstractInstrumentBuilder<BuilderT extends AbstractInstrumentBuil
return instrumentFactory.apply(descriptor, storage);
}
final void registerDoubleAsynchronousInstrument(
InstrumentType type, Consumer<ObservableDoubleMeasurement> updater) {
final List<AsynchronousMetricStorage<?, ObservableDoubleMeasurement>>
registerDoubleAsynchronousInstrument(
InstrumentType type, Consumer<ObservableDoubleMeasurement> updater) {
InstrumentDescriptor descriptor = makeDescriptor(type, InstrumentValueType.DOUBLE);
meterSharedState.registerDoubleAsynchronousInstrument(
return meterSharedState.registerDoubleAsynchronousInstrument(
descriptor, meterProviderSharedState, updater);
}
final void registerLongAsynchronousInstrument(
InstrumentType type, Consumer<ObservableLongMeasurement> updater) {
final List<AsynchronousMetricStorage<?, ObservableLongMeasurement>>
registerLongAsynchronousInstrument(
InstrumentType type, Consumer<ObservableLongMeasurement> updater) {
InstrumentDescriptor descriptor = makeDescriptor(type, InstrumentValueType.LONG);
meterSharedState.registerLongAsynchronousInstrument(
return meterSharedState.registerLongAsynchronousInstrument(
descriptor, meterProviderSharedState, updater);
}

View File

@ -102,8 +102,6 @@ final class SdkDoubleCounter extends AbstractInstrument implements DoubleCounter
static final class Builder extends AbstractInstrumentBuilder<SdkDoubleCounter.Builder>
implements DoubleCounterBuilder {
private static final ObservableDoubleCounter NOOP = new ObservableDoubleCounter() {};
Builder(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState sharedState,
@ -127,8 +125,10 @@ final class SdkDoubleCounter extends AbstractInstrument implements DoubleCounter
@Override
public ObservableDoubleCounter buildWithCallback(
Consumer<ObservableDoubleMeasurement> callback) {
registerDoubleAsynchronousInstrument(InstrumentType.OBSERVABLE_COUNTER, callback);
return NOOP;
return new SdkObservableInstrument<>(
instrumentName,
registerDoubleAsynchronousInstrument(InstrumentType.OBSERVABLE_COUNTER, callback),
callback);
}
}
}

View File

@ -17,8 +17,6 @@ import java.util.function.Consumer;
final class SdkDoubleGaugeBuilder extends AbstractInstrumentBuilder<SdkDoubleGaugeBuilder>
implements DoubleGaugeBuilder {
private static final ObservableDoubleGauge NOOP = new ObservableDoubleGauge() {};
SdkDoubleGaugeBuilder(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState,
@ -47,7 +45,9 @@ final class SdkDoubleGaugeBuilder extends AbstractInstrumentBuilder<SdkDoubleGau
@Override
public ObservableDoubleGauge buildWithCallback(Consumer<ObservableDoubleMeasurement> callback) {
registerDoubleAsynchronousInstrument(InstrumentType.OBSERVABLE_GAUGE, callback);
return NOOP;
return new SdkObservableInstrument<>(
instrumentName,
registerDoubleAsynchronousInstrument(InstrumentType.OBSERVABLE_GAUGE, callback),
callback);
}
}

View File

@ -22,7 +22,6 @@ import io.opentelemetry.sdk.metrics.internal.state.WriteableMetricStorage;
import java.util.function.Consumer;
final class SdkDoubleUpDownCounter extends AbstractInstrument implements DoubleUpDownCounter {
private static final ObservableDoubleUpDownCounter NOOP = new ObservableDoubleUpDownCounter() {};
private final WriteableMetricStorage storage;
@ -101,8 +100,10 @@ final class SdkDoubleUpDownCounter extends AbstractInstrument implements DoubleU
@Override
public ObservableDoubleUpDownCounter buildWithCallback(
Consumer<ObservableDoubleMeasurement> callback) {
registerDoubleAsynchronousInstrument(InstrumentType.OBSERVABLE_UP_DOWN_COUNTER, callback);
return NOOP;
return new SdkObservableInstrument<>(
instrumentName,
registerDoubleAsynchronousInstrument(InstrumentType.OBSERVABLE_UP_DOWN_COUNTER, callback),
callback);
}
}
}

View File

@ -26,7 +26,6 @@ import java.util.logging.Level;
import java.util.logging.Logger;
final class SdkLongCounter extends AbstractInstrument implements LongCounter {
private static final ObservableLongCounter NOOP = new ObservableLongCounter() {};
private static final Logger logger = Logger.getLogger(SdkLongCounter.class.getName());
@ -139,8 +138,10 @@ final class SdkLongCounter extends AbstractInstrument implements LongCounter {
@Override
public ObservableLongCounter buildWithCallback(Consumer<ObservableLongMeasurement> callback) {
registerLongAsynchronousInstrument(InstrumentType.OBSERVABLE_COUNTER, callback);
return NOOP;
return new SdkObservableInstrument<>(
instrumentName,
registerLongAsynchronousInstrument(InstrumentType.OBSERVABLE_COUNTER, callback),
callback);
}
}
}

View File

@ -16,8 +16,6 @@ import java.util.function.Consumer;
final class SdkLongGaugeBuilder extends AbstractInstrumentBuilder<SdkLongGaugeBuilder>
implements LongGaugeBuilder {
private static final ObservableLongGauge NOOP = new ObservableLongGauge() {};
SdkLongGaugeBuilder(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState sharedState,
@ -34,7 +32,9 @@ final class SdkLongGaugeBuilder extends AbstractInstrumentBuilder<SdkLongGaugeBu
@Override
public ObservableLongGauge buildWithCallback(Consumer<ObservableLongMeasurement> callback) {
registerLongAsynchronousInstrument(InstrumentType.OBSERVABLE_GAUGE, callback);
return NOOP;
return new SdkObservableInstrument<>(
instrumentName,
registerLongAsynchronousInstrument(InstrumentType.OBSERVABLE_GAUGE, callback),
callback);
}
}

View File

@ -23,7 +23,6 @@ import io.opentelemetry.sdk.metrics.internal.state.WriteableMetricStorage;
import java.util.function.Consumer;
final class SdkLongUpDownCounter extends AbstractInstrument implements LongUpDownCounter {
private static final ObservableLongUpDownCounter NOOP = new ObservableLongUpDownCounter() {};
private final WriteableMetricStorage storage;
@ -114,8 +113,10 @@ final class SdkLongUpDownCounter extends AbstractInstrument implements LongUpDow
@Override
public ObservableLongUpDownCounter buildWithCallback(
Consumer<ObservableLongMeasurement> callback) {
registerLongAsynchronousInstrument(InstrumentType.OBSERVABLE_UP_DOWN_COUNTER, callback);
return NOOP;
return new SdkObservableInstrument<>(
instrumentName,
registerLongAsynchronousInstrument(InstrumentType.OBSERVABLE_UP_DOWN_COUNTER, callback),
callback);
}
}
}

View File

@ -0,0 +1,54 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics;
import io.opentelemetry.api.metrics.ObservableDoubleCounter;
import io.opentelemetry.api.metrics.ObservableDoubleGauge;
import io.opentelemetry.api.metrics.ObservableDoubleUpDownCounter;
import io.opentelemetry.api.metrics.ObservableLongCounter;
import io.opentelemetry.api.metrics.ObservableLongGauge;
import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import io.opentelemetry.sdk.metrics.internal.state.AsynchronousMetricStorage;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
class SdkObservableInstrument<O>
implements ObservableDoubleCounter,
ObservableLongCounter,
ObservableDoubleGauge,
ObservableLongGauge,
ObservableDoubleUpDownCounter,
ObservableLongUpDownCounter {
private static final Logger logger = Logger.getLogger(SdkObservableInstrument.class.getName());
private final ThrottlingLogger throttlingLogger = new ThrottlingLogger(logger);
private final String instrumentName;
private final List<AsynchronousMetricStorage<?, O>> storages;
private final Consumer<O> callback;
private final AtomicBoolean removed = new AtomicBoolean(false);
SdkObservableInstrument(
String instrumentName, List<AsynchronousMetricStorage<?, O>> storages, Consumer<O> callback) {
this.instrumentName = instrumentName;
this.storages = storages;
this.callback = callback;
}
@Override
public void close() {
if (!removed.compareAndSet(false, true)) {
throttlingLogger.log(
Level.WARNING, "Instrument " + instrumentName + " has called close() multiple times.");
return;
}
storages.forEach(storage -> storage.removeCallback(callback));
}
}

View File

@ -86,20 +86,4 @@ public abstract class MetricDescriptor {
&& Objects.equals(
getSourceInstrument().getValueType(), other.getSourceInstrument().getValueType());
}
/** Returns whether the descriptor describes an async {@link InstrumentType}. */
public boolean isAsync() {
switch (getSourceInstrument().getType()) {
case OBSERVABLE_UP_DOWN_COUNTER:
case OBSERVABLE_GAUGE:
case OBSERVABLE_COUNTER:
return true;
case HISTOGRAM:
case COUNTER:
case UP_DOWN_COUNTER:
return false;
}
throw new IllegalStateException(
"Unrecognized instrument type " + getSourceInstrument().getType());
}
}

View File

@ -26,7 +26,9 @@ import io.opentelemetry.sdk.metrics.internal.view.AttributesProcessor;
import io.opentelemetry.sdk.metrics.view.View;
import io.opentelemetry.sdk.resources.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.logging.Level;
@ -38,104 +40,61 @@ import java.util.logging.Logger;
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class AsynchronousMetricStorage<T> implements MetricStorage {
public class AsynchronousMetricStorage<T, O> implements MetricStorage {
private static final Logger logger = Logger.getLogger(AsynchronousMetricStorage.class.getName());
private final ThrottlingLogger throttlingLogger = new ThrottlingLogger(logger);
private final MetricDescriptor metricDescriptor;
private final ReentrantLock collectLock = new ReentrantLock();
private final AsyncAccumulator<T> asyncAccumulator;
private final List<Consumer<O>> callbacks = new CopyOnWriteArrayList<>();
private final MetricDescriptor metricDescriptor;
private final TemporalMetricStorage<T> storage;
private final Runnable metricUpdater;
/** Constructs asynchronous metric storage which stores nothing. */
public static MetricStorage empty() {
return EmptyMetricStorage.INSTANCE;
}
/** Constructs storage for {@code double} valued instruments. */
public static <T> MetricStorage doubleAsynchronousAccumulator(
View view,
InstrumentDescriptor instrument,
Consumer<ObservableDoubleMeasurement> metricUpdater) {
MetricDescriptor metricDescriptor = MetricDescriptor.create(view, instrument);
Aggregator<T> aggregator =
((AggregatorFactory) view.getAggregation())
.createAggregator(instrument, ExemplarFilter.neverSample());
AsyncAccumulator<T> measurementAccumulator = new AsyncAccumulator<>(instrument);
if (Aggregator.drop() == aggregator) {
return empty();
}
AttributesProcessor attributesProcessor = view.getAttributesProcessor();
// TODO: Find a way to grab the measurement JUST ONCE for all async metrics.
ObservableDoubleMeasurement result =
new ObservableDoubleMeasurement() {
@Override
public void record(double value, Attributes attributes) {
T accumulation =
aggregator.accumulateDoubleMeasurement(value, attributes, Context.current());
if (accumulation != null) {
measurementAccumulator.record(
attributesProcessor.process(attributes, Context.current()), accumulation);
}
}
@Override
public void record(double value) {
record(value, Attributes.empty());
}
};
return new AsynchronousMetricStorage<>(
metricDescriptor, aggregator, measurementAccumulator, () -> metricUpdater.accept(result));
}
/** Constructs storage for {@code long} valued instruments. */
public static <T> MetricStorage longAsynchronousAccumulator(
View view,
InstrumentDescriptor instrument,
Consumer<ObservableLongMeasurement> metricUpdater) {
MetricDescriptor metricDescriptor = MetricDescriptor.create(view, instrument);
Aggregator<T> aggregator =
((AggregatorFactory) view.getAggregation())
.createAggregator(instrument, ExemplarFilter.neverSample());
AsyncAccumulator<T> measurementAccumulator = new AsyncAccumulator<>(instrument);
if (Aggregator.drop() == aggregator) {
return empty();
}
AttributesProcessor attributesProcessor = view.getAttributesProcessor();
// TODO: Find a way to grab the measurement JUST ONCE for all async metrics.
ObservableLongMeasurement result =
new ObservableLongMeasurement() {
@Override
public void record(long value, Attributes attributes) {
T accumulation =
aggregator.accumulateLongMeasurement(value, attributes, Context.current());
if (accumulation != null) {
measurementAccumulator.record(
attributesProcessor.process(attributes, Context.current()), accumulation);
}
}
@Override
public void record(long value) {
record(value, Attributes.empty());
}
};
return new AsynchronousMetricStorage<>(
metricDescriptor, aggregator, measurementAccumulator, () -> metricUpdater.accept(result));
}
private final AsyncAccumulator<T> accumulator;
private final O measurement;
private AsynchronousMetricStorage(
MetricDescriptor metricDescriptor,
Aggregator<T> aggregator,
AsyncAccumulator<T> asyncAccumulator,
Runnable metricUpdater) {
AsyncAccumulator<T> accumulator,
O measurement) {
this.metricDescriptor = metricDescriptor;
this.asyncAccumulator = asyncAccumulator;
this.metricUpdater = metricUpdater;
this.storage = new TemporalMetricStorage<>(aggregator, /* isSynchronous= */ false);
this.accumulator = accumulator;
this.measurement = measurement;
}
/** Create an asynchronous storage instance for double measurements. */
public static <T>
AsynchronousMetricStorage<?, ObservableDoubleMeasurement> createDoubleAsyncStorage(
View view, InstrumentDescriptor instrument) {
MetricDescriptor metricDescriptor = MetricDescriptor.create(view, instrument);
// TODO: optimize when aggregator is Aggregator.drop()
Aggregator<T> aggregator =
((AggregatorFactory) view.getAggregation())
.createAggregator(instrument, ExemplarFilter.neverSample());
AsyncAccumulator<T> accumulator = new AsyncAccumulator<>(instrument);
ObservableDoubleMeasurement measurement =
new ObservableDoubleMeasurementImpl<>(
aggregator, accumulator, view.getAttributesProcessor());
return new AsynchronousMetricStorage<>(metricDescriptor, aggregator, accumulator, measurement);
}
/** Create an asynchronous storage instance for long measurements. */
public static <T> AsynchronousMetricStorage<?, ObservableLongMeasurement> createLongAsyncStorage(
View view, InstrumentDescriptor instrument) {
MetricDescriptor metricDescriptor = MetricDescriptor.create(view, instrument);
// TODO: optimize when aggregator is Aggregator.drop()
Aggregator<T> aggregator =
((AggregatorFactory) view.getAggregation())
.createAggregator(instrument, ExemplarFilter.neverSample());
AsyncAccumulator<T> accumulator = new AsyncAccumulator<>(instrument);
ObservableLongMeasurement measurement =
new ObservableLongMeasurementImpl<>(aggregator, accumulator, view.getAttributesProcessor());
return new AsynchronousMetricStorage<>(metricDescriptor, aggregator, accumulator, measurement);
}
@Override
public MetricDescriptor getMetricDescriptor() {
return metricDescriptor;
}
@Override
@ -151,7 +110,14 @@ public final class AsynchronousMetricStorage<T> implements MetricStorage {
collectLock.lock();
try {
try {
metricUpdater.run();
boolean empty = true;
for (Consumer<O> callback : callbacks) {
empty = false;
callback.accept(measurement);
}
if (empty) {
return EmptyMetricData.getInstance();
}
} catch (Throwable e) {
propagateIfFatal(e);
throttlingLogger.log(
@ -168,7 +134,7 @@ public final class AsynchronousMetricStorage<T> implements MetricStorage {
instrumentationLibraryInfo,
getMetricDescriptor(),
temporality,
asyncAccumulator.collectAndReset(),
accumulator.collectAndReset(),
startEpochNanos,
epochNanos);
} finally {
@ -176,9 +142,17 @@ public final class AsynchronousMetricStorage<T> implements MetricStorage {
}
}
@Override
public MetricDescriptor getMetricDescriptor() {
return metricDescriptor;
/** Add a callback to the storage. */
public void addCallback(Consumer<O> callback) {
this.callbacks.add(callback);
}
/**
* Remove the callback from the storage. Called when {@link AutoCloseable#close()} is invoked on
* observable instruments.
*/
public void removeCallback(Consumer<O> callback) {
this.callbacks.remove(callback);
}
/** Helper class to record async measurements on demand. */
@ -192,7 +166,7 @@ public final class AsynchronousMetricStorage<T> implements MetricStorage {
this.instrument = instrument;
}
public void record(Attributes attributes, T accumulation) {
void record(Attributes attributes, T accumulation) {
// Check we're under the max allowed accumulations
if (currentAccumulation.size() >= MetricStorageUtils.MAX_ACCUMULATIONS) {
throttlingLogger.log(
@ -218,10 +192,70 @@ public final class AsynchronousMetricStorage<T> implements MetricStorage {
currentAccumulation.put(attributes, accumulation);
}
public Map<Attributes, T> collectAndReset() {
Map<Attributes, T> collectAndReset() {
Map<Attributes, T> result = currentAccumulation;
currentAccumulation = new HashMap<>();
return result;
}
}
private static class ObservableLongMeasurementImpl<T> implements ObservableLongMeasurement {
private final Aggregator<T> aggregator;
private final AsyncAccumulator<T> asyncAccumulator;
private final AttributesProcessor attributesProcessor;
private ObservableLongMeasurementImpl(
Aggregator<T> aggregator,
AsyncAccumulator<T> asyncAccumulator,
AttributesProcessor attributesProcessor) {
this.aggregator = aggregator;
this.asyncAccumulator = asyncAccumulator;
this.attributesProcessor = attributesProcessor;
}
@Override
public void record(long value) {
record(value, Attributes.empty());
}
@Override
public void record(long value, Attributes attributes) {
T accumulation = aggregator.accumulateLongMeasurement(value, attributes, Context.current());
if (accumulation != null) {
asyncAccumulator.record(
attributesProcessor.process(attributes, Context.current()), accumulation);
}
}
}
private static class ObservableDoubleMeasurementImpl<T> implements ObservableDoubleMeasurement {
private final Aggregator<T> aggregator;
private final AsyncAccumulator<T> asyncAccumulator;
private final AttributesProcessor attributesProcessor;
private ObservableDoubleMeasurementImpl(
Aggregator<T> aggregator,
AsyncAccumulator<T> asyncAccumulator,
AttributesProcessor attributesProcessor) {
this.aggregator = aggregator;
this.asyncAccumulator = asyncAccumulator;
this.attributesProcessor = attributesProcessor;
}
@Override
public void record(double value) {
record(value, Attributes.empty());
}
@Override
public void record(double value, Attributes attributes) {
T accumulation = aggregator.accumulateDoubleMeasurement(value, attributes, Context.current());
if (accumulation != null) {
asyncAccumulator.record(
attributesProcessor.process(attributes, Context.current()), accumulation);
}
}
}
}

View File

@ -96,12 +96,6 @@ public final class DebugUtils {
.append(existing.getSourceInstrument().getValueType())
.append("]\n");
}
if (existing.isAsync()) {
result
.append("- InstrumentType [")
.append(existing.getSourceInstrument().getType())
.append("] is async and already registered\n");
}
// Next we write out where the existing metric descriptor came from, either a raw instrument
// or a view on a raw instrument.

View File

@ -5,6 +5,8 @@
package io.opentelemetry.sdk.metrics.internal.state;
import static java.util.stream.Collectors.toList;
import com.google.auto.value.AutoValue;
import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
@ -19,7 +21,6 @@ import java.util.Objects;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
@ -88,7 +89,7 @@ public abstract class MeterSharedState {
.filter(m -> !m.isEmpty())
.map(this::register)
.filter(Objects::nonNull)
.collect(Collectors.toList());
.collect(toList());
if (storage.size() == 1) {
return storage.get(0);
@ -98,39 +99,60 @@ public abstract class MeterSharedState {
}
/** Registers new asynchronous storage associated with a given {@code long} instrument. */
public final void registerLongAsynchronousInstrument(
InstrumentDescriptor instrument,
MeterProviderSharedState meterProviderSharedState,
Consumer<ObservableLongMeasurement> metricUpdater) {
public final List<AsynchronousMetricStorage<?, ObservableLongMeasurement>>
registerLongAsynchronousInstrument(
InstrumentDescriptor instrument,
MeterProviderSharedState meterProviderSharedState,
Consumer<ObservableLongMeasurement> callback) {
meterProviderSharedState
.getViewRegistry()
.findViews(instrument, getInstrumentationLibraryInfo())
.stream()
.map(
view ->
AsynchronousMetricStorage.longAsynchronousAccumulator(
view, instrument, metricUpdater))
.filter(m -> !m.isEmpty())
.forEach(this::register);
List<AsynchronousMetricStorage<?, ObservableLongMeasurement>> storages =
meterProviderSharedState
.getViewRegistry()
.findViews(instrument, getInstrumentationLibraryInfo())
.stream()
.map(view -> AsynchronousMetricStorage.createLongAsyncStorage(view, instrument))
.filter(storage -> !storage.isEmpty())
.collect(toList());
List<AsynchronousMetricStorage<?, ObservableLongMeasurement>> registeredStorages =
new ArrayList<>();
for (AsynchronousMetricStorage<?, ObservableLongMeasurement> storage : storages) {
AsynchronousMetricStorage<?, ObservableLongMeasurement> registeredStorage = register(storage);
if (registeredStorage != null) {
registeredStorage.addCallback(callback);
registeredStorages.add(registeredStorage);
}
}
return registeredStorages;
}
/** Registers new asynchronous storage associated with a given {@code double} instrument. */
public final void registerDoubleAsynchronousInstrument(
InstrumentDescriptor instrument,
MeterProviderSharedState meterProviderSharedState,
Consumer<ObservableDoubleMeasurement> metricUpdater) {
public final List<AsynchronousMetricStorage<?, ObservableDoubleMeasurement>>
registerDoubleAsynchronousInstrument(
InstrumentDescriptor instrument,
MeterProviderSharedState meterProviderSharedState,
Consumer<ObservableDoubleMeasurement> callback) {
meterProviderSharedState
.getViewRegistry()
.findViews(instrument, getInstrumentationLibraryInfo())
.stream()
.map(
view ->
AsynchronousMetricStorage.doubleAsynchronousAccumulator(
view, instrument, metricUpdater))
.filter(m -> !m.isEmpty())
.forEach(this::register);
List<AsynchronousMetricStorage<?, ObservableDoubleMeasurement>> storages =
meterProviderSharedState
.getViewRegistry()
.findViews(instrument, getInstrumentationLibraryInfo())
.stream()
.map(view -> AsynchronousMetricStorage.createDoubleAsyncStorage(view, instrument))
.filter(storage -> !storage.isEmpty())
.collect(toList());
List<AsynchronousMetricStorage<?, ObservableDoubleMeasurement>> registeredStorages =
new ArrayList<>();
for (AsynchronousMetricStorage<?, ObservableDoubleMeasurement> storage : storages) {
AsynchronousMetricStorage<?, ObservableDoubleMeasurement> registeredStorage =
register(storage);
if (registeredStorage != null) {
registeredStorage.addCallback(callback);
registeredStorages.add(registeredStorage);
}
}
return registeredStorages;
}
@Nullable

View File

@ -69,13 +69,6 @@ public class MetricStorageRegistry {
descriptor,
"Metric with same name and different descriptor already created.");
}
// Descriptors are compatible, but can't register async instruments multiple times.
if (descriptor.isAsync()) {
throw new DuplicateMetricStorageException(
oldOrNewStorage.getMetricDescriptor(),
descriptor,
"Async metric with same name has already been created.");
}
// Metric already existed, and is compatible with new storage.
return (I) oldOrNewStorage;
}

View File

@ -10,6 +10,7 @@ import static io.opentelemetry.sdk.testing.assertj.MetricAssertions.assertThat;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableDoubleGauge;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
@ -17,7 +18,7 @@ import io.opentelemetry.sdk.testing.time.TestClock;
import java.time.Duration;
import org.junit.jupiter.api.Test;
/** Unit tests for {@link DoubleValueObserverSdk}. */
/** Unit tests for SDK {@link ObservableDoubleGauge}. */
class SdkDoubleGaugeBuilderTest {
private static final Resource RESOURCE =
Resource.create(Attributes.of(stringKey("resource_key"), "resource_value"));
@ -33,6 +34,20 @@ class SdkDoubleGaugeBuilderTest {
.build();
private final Meter sdkMeter = sdkMeterProvider.get(getClass().getName());
@Test
void removeCallback() {
ObservableDoubleGauge gauge =
sdkMeter.gaugeBuilder("testGauge").buildWithCallback(measurement -> measurement.record(10));
assertThat(sdkMeterReader.collectAllMetrics())
.satisfiesExactly(
metric -> assertThat(metric).hasName("testGauge").hasDoubleGauge().points().hasSize(1));
gauge.close();
assertThat(sdkMeterReader.collectAllMetrics()).hasSize(0);
}
@Test
void collectMetrics_NoRecords() {
sdkMeter
@ -44,7 +59,6 @@ class SdkDoubleGaugeBuilderTest {
}
@Test
@SuppressWarnings("unchecked")
void collectMetrics_WithOneRecord() {
sdkMeter
.gaugeBuilder("testObserver")

View File

@ -10,6 +10,7 @@ import static io.opentelemetry.sdk.testing.assertj.MetricAssertions.assertThat;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongGauge;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
@ -17,7 +18,7 @@ import io.opentelemetry.sdk.testing.time.TestClock;
import java.time.Duration;
import org.junit.jupiter.api.Test;
/** Unit tests for {@link LongValueObserverSdk}. */
/** Unit tests for SDK {@link ObservableLongGauge}. */
class SdkLongGaugeBuilderTest {
private static final Resource RESOURCE =
Resource.create(Attributes.of(stringKey("resource_key"), "resource_value"));
@ -33,6 +34,23 @@ class SdkLongGaugeBuilderTest {
.build();
private final Meter sdkMeter = sdkMeterProvider.get(getClass().getName());
@Test
void removeCallback() {
ObservableLongGauge gauge =
sdkMeter
.gaugeBuilder("testGauge")
.ofLongs()
.buildWithCallback(measurement -> measurement.record(10));
assertThat(sdkMeterReader.collectAllMetrics())
.satisfiesExactly(
metric -> assertThat(metric).hasName("testGauge").hasLongGauge().points().hasSize(1));
gauge.close();
assertThat(sdkMeterReader.collectAllMetrics()).hasSize(0);
}
@Test
void collectMetrics_NoRecords() {
sdkMeter
@ -45,7 +63,6 @@ class SdkLongGaugeBuilderTest {
}
@Test
@SuppressWarnings("unchecked")
void collectMetrics_WithOneRecord() {
sdkMeter
.gaugeBuilder("testObserver")

View File

@ -6,6 +6,7 @@
package io.opentelemetry.sdk.metrics;
import static io.opentelemetry.sdk.testing.assertj.MetricAssertions.assertThat;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
import io.opentelemetry.api.baggage.Baggage;
@ -19,11 +20,13 @@ import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.LongUpDownCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.api.metrics.ObservableLongCounter;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricReader;
import io.opentelemetry.sdk.metrics.view.Aggregation;
@ -35,6 +38,7 @@ import io.opentelemetry.sdk.testing.time.TestClock;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
@ -453,6 +457,64 @@ class SdkMeterProviderTest {
.hasValue(10.1)));
}
@Test
void removeAsyncInstrument() {
InMemoryMetricReader reader = InMemoryMetricReader.create();
Meter meter =
sdkMeterProviderBuilder.registerMetricReader(reader).build().get(getClass().getName());
ObservableLongCounter observableCounter1 =
meter
.counterBuilder("foo")
.buildWithCallback(
measurement ->
measurement.record(10, Attributes.builder().put("callback", "one").build()));
ObservableLongCounter observableCounter2 =
meter
.counterBuilder("foo")
.buildWithCallback(
measurement ->
measurement.record(10, Attributes.builder().put("callback", "two").build()));
assertThat(reader.collectAllMetrics())
.hasSize(1)
.satisfiesExactly(
metricData ->
assertThat(metricData)
.hasLongSum()
.points()
.hasSize(2)
.satisfiesExactlyInAnyOrder(
pointData ->
assertThat(pointData)
.hasAttributes(Attributes.builder().put("callback", "one").build()),
(Consumer<LongPointData>)
longPointData ->
assertThat(longPointData)
.hasAttributes(
Attributes.builder().put("callback", "two").build())));
observableCounter1.close();
assertThat(reader.collectAllMetrics())
.hasSize(1)
.satisfiesExactly(
metricData ->
assertThat(metricData)
.hasLongSum()
.points()
.hasSize(1)
.satisfiesExactlyInAnyOrder(
(Consumer<LongPointData>)
longPointData ->
assertThat(longPointData)
.hasAttributes(
Attributes.builder().put("callback", "two").build())));
observableCounter2.close();
assertThat(reader.collectAllMetrics()).hasSize(0);
}
@Test
void viewSdk_AllowRenames() {
InMemoryMetricReader reader = InMemoryMetricReader.create();

View File

@ -32,6 +32,27 @@ class SdkObservableDoubleCounterTest {
private final SdkMeterProviderBuilder sdkMeterProviderBuilder =
SdkMeterProvider.builder().setClock(testClock).setResource(RESOURCE);
@Test
void removeCallback() {
InMemoryMetricReader sdkMeterReader = InMemoryMetricReader.create();
ObservableDoubleCounter counter =
sdkMeterProviderBuilder
.registerMetricReader(sdkMeterReader)
.build()
.get(getClass().getName())
.counterBuilder("testCounter")
.ofDoubles()
.buildWithCallback(measurement -> measurement.record(10));
assertThat(sdkMeterReader.collectAllMetrics())
.satisfiesExactly(
metric -> assertThat(metric).hasName("testCounter").hasDoubleSum().points().hasSize(1));
counter.close();
assertThat(sdkMeterReader.collectAllMetrics()).hasSize(0);
}
@Test
void collectMetrics_NoRecords() {
InMemoryMetricReader sdkMeterReader = InMemoryMetricReader.create();

View File

@ -7,6 +7,7 @@ package io.opentelemetry.sdk.metrics;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.sdk.testing.assertj.MetricAssertions.assertThat;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.ObservableDoubleUpDownCounter;
@ -32,6 +33,27 @@ class SdkObservableDoubleUpDownCounterTest {
private final SdkMeterProviderBuilder sdkMeterProviderBuilder =
SdkMeterProvider.builder().setClock(testClock).setResource(RESOURCE);
@Test
void removeCallback() {
InMemoryMetricReader sdkMeterReader = InMemoryMetricReader.create();
ObservableDoubleUpDownCounter counter =
sdkMeterProviderBuilder
.registerMetricReader(sdkMeterReader)
.build()
.get(getClass().getName())
.upDownCounterBuilder("testCounter")
.ofDoubles()
.buildWithCallback(measurement -> measurement.record(10));
assertThat(sdkMeterReader.collectAllMetrics())
.satisfiesExactly(
metric -> assertThat(metric).hasName("testCounter").hasDoubleSum().points().hasSize(1));
counter.close();
assertThat(sdkMeterReader.collectAllMetrics()).hasSize(0);
}
@Test
void collectMetrics_NoRecords() {
InMemoryMetricReader sdkMeterReader = InMemoryMetricReader.create();

View File

@ -0,0 +1,52 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import io.github.netmikey.logunit.api.LogCapturer;
import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
import io.opentelemetry.sdk.metrics.internal.state.AsynchronousMetricStorage;
import java.util.Arrays;
import java.util.function.Consumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.Mockito;
class SdkObservableInstrumentTest {
@RegisterExtension
LogCapturer logs = LogCapturer.create().captureForType(SdkObservableInstrument.class);
@Test
@SuppressWarnings("unchecked")
void close() {
AsynchronousMetricStorage<?, ObservableDoubleMeasurement> storage1 =
mock(AsynchronousMetricStorage.class);
AsynchronousMetricStorage<?, ObservableDoubleMeasurement> storage2 =
mock(AsynchronousMetricStorage.class);
Consumer<ObservableDoubleMeasurement> callback = unused -> {};
SdkObservableInstrument<ObservableDoubleMeasurement> observableInstrument =
new SdkObservableInstrument<>("my-instrument", Arrays.asList(storage1, storage2), callback);
// First call to close should trigger remove from storage
observableInstrument.close();
verify(storage1).removeCallback(callback);
verify(storage2).removeCallback(callback);
logs.assertDoesNotContain("Instrument my-instrument has called close() multiple times.");
// Close a second time should not trigger remove from storage and should log a warning
Mockito.reset(storage1, storage2);
observableInstrument.close();
verify(storage1, never()).removeCallback(any());
verify(storage2, never()).removeCallback(any());
logs.assertContains("Instrument my-instrument has called close() multiple times.");
}
}

View File

@ -32,6 +32,26 @@ class SdkObservableLongCounterTest {
private final SdkMeterProviderBuilder sdkMeterProviderBuilder =
SdkMeterProvider.builder().setClock(testClock).setResource(RESOURCE);
@Test
void removeCallback() {
InMemoryMetricReader sdkMeterReader = InMemoryMetricReader.create();
ObservableLongCounter counter =
sdkMeterProviderBuilder
.registerMetricReader(sdkMeterReader)
.build()
.get(getClass().getName())
.counterBuilder("testCounter")
.buildWithCallback(measurement -> measurement.record(10));
assertThat(sdkMeterReader.collectAllMetrics())
.satisfiesExactly(
metric -> assertThat(metric).hasName("testCounter").hasLongSum().points().hasSize(1));
counter.close();
assertThat(sdkMeterReader.collectAllMetrics()).hasSize(0);
}
@Test
void collectMetrics_NoRecords() {
InMemoryMetricReader sdkMeterReader = InMemoryMetricReader.create();

View File

@ -32,6 +32,26 @@ class SdkObservableLongUpDownCounterTest {
private final SdkMeterProviderBuilder sdkMeterProviderBuilder =
SdkMeterProvider.builder().setClock(testClock).setResource(RESOURCE);
@Test
void removeCallback() {
InMemoryMetricReader sdkMeterReader = InMemoryMetricReader.create();
ObservableLongUpDownCounter counter =
sdkMeterProviderBuilder
.registerMetricReader(sdkMeterReader)
.build()
.get(getClass().getName())
.upDownCounterBuilder("testCounter")
.buildWithCallback(measurement -> measurement.record(10));
assertThat(sdkMeterReader.collectAllMetrics())
.satisfiesExactly(
metric -> assertThat(metric).hasName("testCounter").hasLongSum().points().hasSize(1));
counter.close();
assertThat(sdkMeterReader.collectAllMetrics()).hasSize(0);
}
@Test
void collectMetrics_NoRecords() {
InMemoryMetricReader sdkMeterReader = InMemoryMetricReader.create();

View File

@ -123,22 +123,4 @@ class MetricDescriptorTest {
InstrumentValueType.LONG))))
.isFalse();
}
@Test
void isAsync() {
assertThat(descriptorForInstrument(InstrumentType.OBSERVABLE_UP_DOWN_COUNTER).isAsync())
.isTrue();
assertThat(descriptorForInstrument(InstrumentType.OBSERVABLE_GAUGE).isAsync()).isTrue();
assertThat(descriptorForInstrument(InstrumentType.OBSERVABLE_COUNTER).isAsync()).isTrue();
assertThat(descriptorForInstrument(InstrumentType.HISTOGRAM).isAsync()).isFalse();
assertThat(descriptorForInstrument(InstrumentType.COUNTER).isAsync()).isFalse();
assertThat(descriptorForInstrument(InstrumentType.UP_DOWN_COUNTER).isAsync()).isFalse();
}
private static MetricDescriptor descriptorForInstrument(InstrumentType instrumentType) {
InstrumentDescriptor instrument =
InstrumentDescriptor.create(
"name", "description", "unit", instrumentType, InstrumentValueType.DOUBLE);
return MetricDescriptor.create(View.builder().build(), instrument);
}
}

View File

@ -6,9 +6,12 @@
package io.opentelemetry.sdk.metrics.internal.state;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.verify;
import io.github.netmikey.logunit.api.LogCapturer;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.context.Context;
import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
@ -28,12 +31,13 @@ import io.opentelemetry.sdk.metrics.view.View;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.time.TestClock;
import java.util.Set;
import java.util.function.Consumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
@ -42,7 +46,6 @@ public class AsynchronousMetricStorageTest {
private MeterProviderSharedState meterProviderSharedState;
private final MeterSharedState meterSharedState =
MeterSharedState.create(InstrumentationLibraryInfo.empty());
private AttributesProcessor spyAttributesProcessor;
private View view;
private CollectionHandle handle;
private Set<CollectionHandle> all;
@ -51,10 +54,10 @@ public class AsynchronousMetricStorageTest {
LogCapturer logs = LogCapturer.create().captureForType(AsynchronousMetricStorage.class);
@Mock private MetricReader reader;
@Spy private AttributesProcessor spyAttributesProcessor = AttributesProcessor.noop();
@BeforeEach
void setup() {
spyAttributesProcessor = Mockito.spy(AttributesProcessor.noop());
view =
View.builder()
.setAggregation(Aggregation.lastValue())
@ -80,62 +83,70 @@ public class AsynchronousMetricStorageTest {
@Test
void doubleAsynchronousAccumulator_AttributesProcessor_used() {
AsynchronousMetricStorage.doubleAsynchronousAccumulator(
AsynchronousMetricStorage<?, ObservableDoubleMeasurement> doubleAsyncStorage =
AsynchronousMetricStorage.createDoubleAsyncStorage(
view,
InstrumentDescriptor.create(
"name",
"description",
"unit",
InstrumentType.OBSERVABLE_GAUGE,
InstrumentValueType.DOUBLE),
value -> value.record(1.0, Attributes.empty()))
.collectAndReset(
CollectionInfo.create(handle, all, reader),
meterProviderSharedState.getResource(),
meterSharedState.getInstrumentationLibraryInfo(),
0,
testClock.now(),
false);
Mockito.verify(spyAttributesProcessor).process(Attributes.empty(), Context.current());
InstrumentValueType.DOUBLE));
doubleAsyncStorage.addCallback(value -> value.record(1.0, Attributes.empty()));
doubleAsyncStorage.collectAndReset(
CollectionInfo.create(handle, all, reader),
meterProviderSharedState.getResource(),
meterSharedState.getInstrumentationLibraryInfo(),
0,
testClock.now(),
/* suppressSynchronousCollection= */ false);
verify(spyAttributesProcessor).process(Attributes.empty(), Context.current());
}
@Test
void longAsynchronousAccumulator_AttributesProcessor_used() {
AsynchronousMetricStorage.longAsynchronousAccumulator(
AsynchronousMetricStorage<?, ObservableLongMeasurement> longAsyncStorage =
AsynchronousMetricStorage.createLongAsyncStorage(
view,
InstrumentDescriptor.create(
"name",
"description",
"unit",
InstrumentType.OBSERVABLE_GAUGE,
InstrumentValueType.LONG),
value -> value.record(1, Attributes.empty()))
.collectAndReset(
CollectionInfo.create(handle, all, reader),
meterProviderSharedState.getResource(),
meterSharedState.getInstrumentationLibraryInfo(),
0,
testClock.nanoTime(),
false);
Mockito.verify(spyAttributesProcessor).process(Attributes.empty(), Context.current());
InstrumentValueType.LONG));
longAsyncStorage.addCallback(value -> value.record(1, Attributes.empty()));
longAsyncStorage.collectAndReset(
CollectionInfo.create(handle, all, reader),
meterProviderSharedState.getResource(),
meterSharedState.getInstrumentationLibraryInfo(),
0,
testClock.nanoTime(),
/* suppressSynchronousCollection= */ false);
verify(spyAttributesProcessor).process(Attributes.empty(), Context.current());
}
@Test
void collectAndReset_IgnoresDuplicates() {
MetricStorage metricStorage =
AsynchronousMetricStorage.longAsynchronousAccumulator(
void collectAndReset_CallsMultipleCallbacks() {
AsynchronousMetricStorage<?, ObservableLongMeasurement> metricStorage =
AsynchronousMetricStorage.createLongAsyncStorage(
view,
InstrumentDescriptor.create(
"my-instrument",
"description",
"unit",
InstrumentType.OBSERVABLE_GAUGE,
InstrumentValueType.LONG),
measurement -> {
measurement.record(1, Attributes.builder().put("key", "a").build());
measurement.record(2, Attributes.builder().put("key", "a").build());
measurement.record(3, Attributes.builder().put("key", "b").build());
});
InstrumentValueType.LONG));
// Callbacks partially overlap for the metrics they record, should take first registered
Consumer<ObservableLongMeasurement> callback1 =
measurement -> measurement.record(1, Attributes.builder().put("key", "a").build());
Consumer<ObservableLongMeasurement> callback2 =
measurement -> {
measurement.record(3, Attributes.builder().put("key", "a").build());
measurement.record(3, Attributes.builder().put("key", "b").build());
};
metricStorage.addCallback(callback1);
metricStorage.addCallback(callback2);
assertThat(
metricStorage.collectAndReset(
CollectionInfo.create(handle, all, reader),
@ -143,7 +154,70 @@ public class AsynchronousMetricStorageTest {
meterSharedState.getInstrumentationLibraryInfo(),
0,
testClock.nanoTime(),
false))
/* suppressSynchronousCollection= */ false))
.satisfies(
metricData ->
assertThat(metricData.getLongGaugeData().getPoints())
.satisfiesExactlyInAnyOrder(
dataPoint -> {
assertThat(dataPoint.getValue()).isEqualTo(1);
assertThat(dataPoint.getAttributes())
.isEqualTo(Attributes.builder().put("key", "a").build());
},
dataPoint -> {
assertThat(dataPoint.getValue()).isEqualTo(3);
assertThat(dataPoint.getAttributes())
.isEqualTo(Attributes.builder().put("key", "b").build());
}));
logs.assertContains(
"Instrument my-instrument has recorded multiple values for the same attributes.");
// Remove callback2, verify only callback1 is called
metricStorage.removeCallback(callback2);
assertThat(
metricStorage.collectAndReset(
CollectionInfo.create(handle, all, reader),
meterProviderSharedState.getResource(),
meterSharedState.getInstrumentationLibraryInfo(),
0,
testClock.nanoTime(),
/* suppressSynchronousCollection= */ false))
.satisfies(
metricData ->
assertThat(metricData.getLongGaugeData().getPoints())
.satisfiesExactlyInAnyOrder(
dataPoint -> {
assertThat(dataPoint.getValue()).isEqualTo(1);
assertThat(dataPoint.getAttributes())
.isEqualTo(Attributes.builder().put("key", "a").build());
}));
}
@Test
void collectAndReset_IgnoresDuplicates() {
AsynchronousMetricStorage<?, ObservableLongMeasurement> metricStorage =
AsynchronousMetricStorage.createLongAsyncStorage(
view,
InstrumentDescriptor.create(
"my-instrument",
"description",
"unit",
InstrumentType.OBSERVABLE_GAUGE,
InstrumentValueType.LONG));
metricStorage.addCallback(
measurement -> {
measurement.record(1, Attributes.builder().put("key", "a").build());
measurement.record(2, Attributes.builder().put("key", "a").build());
measurement.record(3, Attributes.builder().put("key", "b").build());
});
assertThat(
metricStorage.collectAndReset(
CollectionInfo.create(handle, all, reader),
meterProviderSharedState.getResource(),
meterSharedState.getInstrumentationLibraryInfo(),
0,
testClock.nanoTime(),
/* suppressSynchronousCollection= */ false))
.satisfies(
metricData ->
assertThat(metricData.getLongGaugeData().getPoints())
@ -165,18 +239,19 @@ public class AsynchronousMetricStorageTest {
@Test
@SuppressLogger(AsynchronousMetricStorage.class)
void collectAndReset_CallbackException() {
MetricStorage metricStorage =
AsynchronousMetricStorage.longAsynchronousAccumulator(
AsynchronousMetricStorage<?, ObservableDoubleMeasurement> metricStorage =
AsynchronousMetricStorage.createDoubleAsyncStorage(
view,
InstrumentDescriptor.create(
"my-instrument",
"description",
"unit",
InstrumentType.OBSERVABLE_GAUGE,
InstrumentValueType.LONG),
unused -> {
throw new RuntimeException("Error!");
});
InstrumentValueType.LONG));
metricStorage.addCallback(
unused -> {
throw new RuntimeException("Error!");
});
assertThat(
metricStorage.collectAndReset(
CollectionInfo.create(handle, all, reader),
@ -184,7 +259,7 @@ public class AsynchronousMetricStorageTest {
meterSharedState.getInstrumentationLibraryInfo(),
0,
testClock.nanoTime(),
false))
/* suppressSynchronousCollection= */ false))
.isEqualTo(EmptyMetricData.getInstance());
logs.assertContains("An exception occurred invoking callback for instrument my-instrument.");
}

View File

@ -28,52 +28,49 @@ class MetricStorageRegistryTest {
descriptor("sync", "other_description", InstrumentType.COUNTER);
private static final MetricDescriptor ASYNC_DESCRIPTOR =
descriptor("async", "description", InstrumentType.OBSERVABLE_GAUGE);
private static final MetricDescriptor OTHER_ASYNC_DESCRIPTOR =
descriptor("async", "other_description", InstrumentType.OBSERVABLE_GAUGE);
private final MeterSharedState meterSharedState =
MeterSharedState.create(InstrumentationLibraryInfo.empty());
private final MetricStorageRegistry metricStorageRegistry = new MetricStorageRegistry();
@Test
void register_Sync() {
TestMetricStorage testInstrument = new TestMetricStorage(SYNC_DESCRIPTOR);
assertThat(meterSharedState.getMetricStorageRegistry().register(testInstrument))
.isSameAs(testInstrument);
assertThat(meterSharedState.getMetricStorageRegistry().register(testInstrument))
.isSameAs(testInstrument);
assertThat(
meterSharedState
.getMetricStorageRegistry()
.register(new TestMetricStorage(SYNC_DESCRIPTOR)))
.isSameAs(testInstrument);
TestMetricStorage storage = new TestMetricStorage(SYNC_DESCRIPTOR);
assertThat(metricStorageRegistry.register(storage)).isSameAs(storage);
assertThat(metricStorageRegistry.register(storage)).isSameAs(storage);
assertThat(metricStorageRegistry.register(new TestMetricStorage(SYNC_DESCRIPTOR)))
.isSameAs(storage);
}
@Test
void register_SyncIncompatibleDescriptor() {
TestMetricStorage testInstrument = new TestMetricStorage(SYNC_DESCRIPTOR);
assertThat(meterSharedState.getMetricStorageRegistry().register(testInstrument))
.isSameAs(testInstrument);
TestMetricStorage storage = new TestMetricStorage(SYNC_DESCRIPTOR);
assertThat(metricStorageRegistry.register(storage)).isSameAs(storage);
assertThatThrownBy(
() ->
meterSharedState
.getMetricStorageRegistry()
.register(new TestMetricStorage(OTHER_SYNC_DESCRIPTOR)))
() -> metricStorageRegistry.register(new TestMetricStorage(OTHER_SYNC_DESCRIPTOR)))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Metric with same name and different descriptor already created.");
}
@Test
void register_Async() {
TestMetricStorage testInstrument = new TestMetricStorage(ASYNC_DESCRIPTOR);
assertThat(meterSharedState.getMetricStorageRegistry().register(testInstrument))
.isSameAs(testInstrument);
TestMetricStorage storage = new TestMetricStorage(ASYNC_DESCRIPTOR);
assertThat(metricStorageRegistry.register(storage)).isSameAs(storage);
assertThat(metricStorageRegistry.register(storage)).isSameAs(storage);
assertThat(metricStorageRegistry.register(new TestMetricStorage(ASYNC_DESCRIPTOR)))
.isSameAs(storage);
}
@Test
void register_AsyncIncompatibleDescriptor() {
TestMetricStorage storage = new TestMetricStorage(ASYNC_DESCRIPTOR);
assertThat(metricStorageRegistry.register(storage)).isSameAs(storage);
assertThatThrownBy(
() ->
meterSharedState
.getMetricStorageRegistry()
.register(new TestMetricStorage(ASYNC_DESCRIPTOR)))
() -> metricStorageRegistry.register(new TestMetricStorage(OTHER_ASYNC_DESCRIPTOR)))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Async metric with same name has already been created.");
.hasMessageContaining("Metric with same name and different descriptor already created.");
}
private static MetricDescriptor descriptor(