Overhaul sdk.metrics package for new SDK: Decouple Instrument + Storage per-metric. (#3455)

* Peel away dependency between instrument API hooks and aggregator package.

- Create new BoundStorageHandle interface for synchronous instruments to use.
- Update all SDK instruments to use it.

* Set up interfaces for storage registry.

* Create MetricStorage abstraction

- Wire all SDK instruments through a MetricStorage abstraction.
- Move all wiring of measurements -> aggregators into an internal package.
- Migrate previous "Accumulation" classes to "storage" classes
- Do just enough to get things compiling (not tested yet)

Goal is to have the "Aggregator" package not referenced in the sdk.metrics package directly in the future.

* Fix tests for metric storage refactoring.

- Move tests to appropriate location
- For now just make view-things public
- Fix possible bug in test of labels processor + appending attributes in a test....

* Remove instrument registry (now metric registry)

* Rename DoubleValueRecorder => DoubleHistogram in SDK

* Rename LongValueRecorderSdk => LongHistogramSdk.

* Rename ValueObserver -> Gauge

* Rename instrument type enum

Update enum to match specification.

* Move ViewRegistry + Helper into internal package

* Fixes to benchmarks

* Add missing javadoc

* Remove references to Labels

* Fixes from review.

* Rework tryUnmap to test the exposed interface
This commit is contained in:
Josh Suereth 2021-08-10 16:23:08 -04:00 committed by GitHub
parent 171c62cf05
commit 0752a4bb17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
70 changed files with 1104 additions and 960 deletions

View File

@ -35,7 +35,7 @@ public enum TestSdk {
.registerView(
InstrumentSelector.builder()
.setInstrumentNameRegex(".*histogram_recorder")
.setInstrumentType(InstrumentType.VALUE_RECORDER)
.setInstrumentType(InstrumentType.HISTOGRAM)
.build(),
// Histogram buckets the same as the metrics prototype/prometheus.
View.builder()

View File

@ -37,7 +37,7 @@ public class DoubleHistogramBenchmark {
"name",
"description",
"1",
InstrumentType.VALUE_RECORDER,
InstrumentType.HISTOGRAM,
InstrumentValueType.DOUBLE));
private AggregatorHandle<HistogramAccumulation> aggregatorHandle;

View File

@ -35,7 +35,7 @@ public class DoubleMinMaxSumCountBenchmark {
"name",
"description",
"1",
InstrumentType.VALUE_RECORDER,
InstrumentType.HISTOGRAM,
InstrumentValueType.DOUBLE));
private AggregatorHandle<MinMaxSumCountAccumulation> aggregatorHandle;

View File

@ -32,11 +32,7 @@ public class LongMinMaxSumCountBenchmark {
Resource.getDefault(),
InstrumentationLibraryInfo.empty(),
InstrumentDescriptor.create(
"name",
"description",
"1",
InstrumentType.VALUE_RECORDER,
InstrumentValueType.LONG));
"name", "description", "1", InstrumentType.HISTOGRAM, InstrumentValueType.LONG));
private AggregatorHandle<MinMaxSumCountAccumulation> aggregatorHandle;
@Setup(Level.Trial)

View File

@ -1,49 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics;
import io.opentelemetry.sdk.metrics.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.processor.LabelsProcessor;
import java.util.List;
abstract class AbstractAccumulator {
/**
* Returns the list of metrics collected.
*
* @return returns the list of metrics collected.
*/
abstract List<MetricData> collectAll(long epochNanos);
static <T> Aggregator<T> getAggregator(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState,
InstrumentDescriptor descriptor) {
return meterProviderSharedState
.getViewRegistry()
.findView(descriptor)
.getAggregatorFactory()
.create(
meterProviderSharedState.getResource(),
meterSharedState.getInstrumentationLibraryInfo(),
descriptor);
}
static LabelsProcessor getLabelsProcessor(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState,
InstrumentDescriptor descriptor) {
return meterProviderSharedState
.getViewRegistry()
.findView(descriptor)
.getLabelsProcessorFactory()
.create(
meterProviderSharedState.getResource(),
meterSharedState.getInstrumentationLibraryInfo(),
descriptor);
}
}

View File

@ -1,25 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.data.MetricData;
import java.util.List;
abstract class AbstractAsynchronousInstrument extends AbstractInstrument {
private final AsynchronousInstrumentAccumulator accumulator;
AbstractAsynchronousInstrument(
InstrumentDescriptor descriptor, AsynchronousInstrumentAccumulator accumulator) {
super(descriptor);
this.accumulator = accumulator;
}
@Override
final List<MetricData> collectAll(long epochNanos) {
return accumulator.collectAll(epochNanos);
}
}

View File

@ -6,8 +6,6 @@
package io.opentelemetry.sdk.metrics;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.data.MetricData;
import java.util.List;
abstract class AbstractInstrument implements Instrument {
@ -22,11 +20,6 @@ abstract class AbstractInstrument implements Instrument {
return descriptor;
}
/**
* Collects records from all the entries (labelSet, Bound) that changed since the previous call.
*/
abstract List<MetricData> collectAll(long epochNanos);
@Override
public boolean equals(Object o) {
if (this == o) {

View File

@ -10,6 +10,9 @@ import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.internal.state.MeterProviderSharedState;
import io.opentelemetry.sdk.metrics.internal.state.MeterSharedState;
import io.opentelemetry.sdk.metrics.internal.state.WriteableMetricStorage;
import java.util.function.BiFunction;
import java.util.function.Consumer;
@ -58,43 +61,25 @@ public abstract class AbstractInstrumentBuilder<BuilderT extends AbstractInstrum
final <I extends AbstractInstrument> I buildSynchronousInstrument(
InstrumentType type,
InstrumentValueType valueType,
BiFunction<InstrumentDescriptor, SynchronousInstrumentAccumulator<?>, I> instrumentFactory) {
BiFunction<InstrumentDescriptor, WriteableMetricStorage, I> instrumentFactory) {
InstrumentDescriptor descriptor = makeDescriptor(type, valueType);
return meterSharedState
.getInstrumentRegistry()
.register(
instrumentFactory.apply(
descriptor,
SynchronousInstrumentAccumulator.create(
meterProviderSharedState, meterSharedState, descriptor)));
WriteableMetricStorage storage =
meterSharedState.registerSynchronousMetricStorage(descriptor, meterProviderSharedState);
return instrumentFactory.apply(descriptor, storage);
}
final <I extends AbstractInstrument> I buildDoubleAsynchronousInstrument(
InstrumentType type,
Consumer<ObservableDoubleMeasurement> updater,
BiFunction<InstrumentDescriptor, AsynchronousInstrumentAccumulator, I> instrumentFactory) {
final void registerDoubleAsynchronousInstrument(
InstrumentType type, Consumer<ObservableDoubleMeasurement> updater) {
InstrumentDescriptor descriptor = makeDescriptor(type, InstrumentValueType.DOUBLE);
return meterSharedState
.getInstrumentRegistry()
.register(
instrumentFactory.apply(
descriptor,
AsynchronousInstrumentAccumulator.doubleAsynchronousAccumulator(
meterProviderSharedState, meterSharedState, descriptor, updater)));
meterSharedState.registerDoubleAsynchronousInstrument(
descriptor, meterProviderSharedState, updater);
}
final <I extends AbstractInstrument> I buildLongAsynchronousInstrument(
InstrumentType type,
Consumer<ObservableLongMeasurement> updater,
BiFunction<InstrumentDescriptor, AsynchronousInstrumentAccumulator, I> instrumentFactory) {
final void registerLongAsynchronousInstrument(
InstrumentType type, Consumer<ObservableLongMeasurement> updater) {
InstrumentDescriptor descriptor = makeDescriptor(type, InstrumentValueType.LONG);
return meterSharedState
.getInstrumentRegistry()
.register(
instrumentFactory.apply(
descriptor,
AsynchronousInstrumentAccumulator.longAsynchronousAccumulator(
meterProviderSharedState, meterSharedState, descriptor, updater)));
meterSharedState.registerLongAsynchronousInstrument(
descriptor, meterProviderSharedState, updater);
}
@FunctionalInterface

View File

@ -1,31 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.metrics.aggregator.AggregatorHandle;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.data.MetricData;
import java.util.List;
abstract class AbstractSynchronousInstrument extends AbstractInstrument {
private final SynchronousInstrumentAccumulator<?> accumulator;
AbstractSynchronousInstrument(
InstrumentDescriptor descriptor, SynchronousInstrumentAccumulator<?> accumulator) {
super(descriptor);
this.accumulator = accumulator;
}
@Override
final List<MetricData> collectAll(long epochNanos) {
return accumulator.collectAll(epochNanos);
}
AggregatorHandle<?> acquireHandle(Attributes labels) {
return accumulator.bind(labels);
}
}

View File

@ -12,27 +12,32 @@ import io.opentelemetry.api.metrics.DoubleCounterBuilder;
import io.opentelemetry.api.metrics.LongCounterBuilder;
import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.metrics.aggregator.AggregatorHandle;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.internal.state.BoundStorageHandle;
import io.opentelemetry.sdk.metrics.internal.state.MeterProviderSharedState;
import io.opentelemetry.sdk.metrics.internal.state.MeterSharedState;
import io.opentelemetry.sdk.metrics.internal.state.WriteableMetricStorage;
import java.util.function.Consumer;
final class DoubleCounterSdk extends AbstractSynchronousInstrument implements DoubleCounter {
final class DoubleCounterSdk extends AbstractInstrument implements DoubleCounter {
private final WriteableMetricStorage storage;
private DoubleCounterSdk(
InstrumentDescriptor descriptor, SynchronousInstrumentAccumulator<?> accumulator) {
super(descriptor, accumulator);
private DoubleCounterSdk(InstrumentDescriptor descriptor, WriteableMetricStorage storage) {
super(descriptor);
this.storage = storage;
}
@Override
public void add(double increment, Attributes labels, Context context) {
AggregatorHandle<?> aggregatorHandle = acquireHandle(labels);
public void add(double increment, Attributes attributes, Context context) {
BoundStorageHandle aggregatorHandle = storage.bind(attributes);
try {
if (increment < 0) {
throw new IllegalArgumentException("Counters can only increase");
}
aggregatorHandle.recordDouble(increment);
aggregatorHandle.recordDouble(increment, attributes, context);
} finally {
aggregatorHandle.release();
}
@ -49,15 +54,17 @@ final class DoubleCounterSdk extends AbstractSynchronousInstrument implements Do
}
@Override
public BoundDoubleCounter bind(Attributes labels) {
return new BoundInstrument(acquireHandle(labels));
public BoundDoubleCounter bind(Attributes attributes) {
return new BoundInstrument(storage.bind(attributes), attributes);
}
static final class BoundInstrument implements BoundDoubleCounter {
private final AggregatorHandle<?> aggregatorHandle;
private final BoundStorageHandle handle;
private final Attributes attributes;
BoundInstrument(AggregatorHandle<?> aggregatorHandle) {
this.aggregatorHandle = aggregatorHandle;
BoundInstrument(BoundStorageHandle handle, Attributes attributes) {
this.handle = handle;
this.attributes = attributes;
}
@Override
@ -65,7 +72,7 @@ final class DoubleCounterSdk extends AbstractSynchronousInstrument implements Do
if (increment < 0) {
throw new IllegalArgumentException("Counters can only increase");
}
aggregatorHandle.recordDouble(increment);
handle.recordDouble(increment, attributes, context);
}
@Override
@ -75,7 +82,7 @@ final class DoubleCounterSdk extends AbstractSynchronousInstrument implements Do
@Override
public void unbind() {
aggregatorHandle.release();
handle.release();
}
}
@ -116,8 +123,7 @@ final class DoubleCounterSdk extends AbstractSynchronousInstrument implements Do
@Override
public void buildWithCallback(Consumer<ObservableDoubleMeasurement> callback) {
buildDoubleAsynchronousInstrument(
InstrumentType.SUM_OBSERVER, callback, DoubleSumObserverSdk::new);
registerDoubleAsynchronousInstrument(InstrumentType.OBSERVABLE_SUM, callback);
}
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics;
import io.opentelemetry.api.metrics.DoubleGaugeBuilder;
import io.opentelemetry.api.metrics.LongGaugeBuilder;
import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.internal.state.MeterProviderSharedState;
import io.opentelemetry.sdk.metrics.internal.state.MeterSharedState;
import java.util.function.Consumer;
final class DoubleGaugeBuilderSdk extends AbstractInstrumentBuilder<DoubleGaugeBuilderSdk>
implements DoubleGaugeBuilder {
DoubleGaugeBuilderSdk(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState,
String name) {
this(meterProviderSharedState, meterSharedState, name, "", "1");
}
DoubleGaugeBuilderSdk(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState sharedState,
String name,
String description,
String unit) {
super(meterProviderSharedState, sharedState, name, description, unit);
}
@Override
protected DoubleGaugeBuilderSdk getThis() {
return this;
}
@Override
public LongGaugeBuilder ofLongs() {
return swapBuilder(LongGaugeBuilderSdk::new);
}
@Override
public void buildWithCallback(Consumer<ObservableDoubleMeasurement> callback) {
registerDoubleAsynchronousInstrument(InstrumentType.OBSERVABLE_GAUGE, callback);
}
}

View File

@ -11,32 +11,30 @@ import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.DoubleHistogramBuilder;
import io.opentelemetry.api.metrics.LongHistogramBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.metrics.aggregator.AggregatorHandle;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.internal.state.BoundStorageHandle;
import io.opentelemetry.sdk.metrics.internal.state.MeterProviderSharedState;
import io.opentelemetry.sdk.metrics.internal.state.MeterSharedState;
import io.opentelemetry.sdk.metrics.internal.state.WriteableMetricStorage;
final class DoubleValueRecorderSdk extends AbstractSynchronousInstrument
implements DoubleHistogram {
final class DoubleHistogramSdk extends AbstractInstrument implements DoubleHistogram {
private final WriteableMetricStorage storage;
private DoubleValueRecorderSdk(
InstrumentDescriptor descriptor, SynchronousInstrumentAccumulator<?> accumulator) {
super(descriptor, accumulator);
private DoubleHistogramSdk(InstrumentDescriptor descriptor, WriteableMetricStorage storage) {
super(descriptor);
this.storage = storage;
}
@Override
public void record(double value, Attributes labels, Context context) {
AggregatorHandle<?> aggregatorHandle = acquireHandle(labels);
try {
aggregatorHandle.recordDouble(value);
} finally {
aggregatorHandle.release();
}
public void record(double value, Attributes attributes, Context context) {
storage.recordDouble(value, attributes, context);
}
@Override
public void record(double value, Attributes labels) {
record(value, labels, Context.current());
public void record(double value, Attributes attributes) {
record(value, attributes, Context.current());
}
@Override
@ -45,20 +43,22 @@ final class DoubleValueRecorderSdk extends AbstractSynchronousInstrument
}
@Override
public BoundDoubleHistogram bind(Attributes labels) {
return new BoundInstrument(acquireHandle(labels));
public BoundDoubleHistogram bind(Attributes attributes) {
return new BoundInstrument(storage.bind(attributes), attributes);
}
static final class BoundInstrument implements BoundDoubleHistogram {
private final AggregatorHandle<?> aggregatorHandle;
private final BoundStorageHandle aggregatorHandle;
private final Attributes attributes;
BoundInstrument(AggregatorHandle<?> aggregatorHandle) {
this.aggregatorHandle = aggregatorHandle;
BoundInstrument(BoundStorageHandle handle, Attributes attributes) {
this.aggregatorHandle = handle;
this.attributes = attributes;
}
@Override
public void record(double value, Context context) {
aggregatorHandle.recordDouble(value);
aggregatorHandle.recordDouble(value, attributes, context);
}
@Override
@ -72,7 +72,7 @@ final class DoubleValueRecorderSdk extends AbstractSynchronousInstrument
}
}
static final class Builder extends AbstractInstrumentBuilder<DoubleValueRecorderSdk.Builder>
static final class Builder extends AbstractInstrumentBuilder<DoubleHistogramSdk.Builder>
implements DoubleHistogramBuilder {
Builder(
@ -97,14 +97,14 @@ final class DoubleValueRecorderSdk extends AbstractSynchronousInstrument
}
@Override
public DoubleValueRecorderSdk build() {
public DoubleHistogramSdk build() {
return buildSynchronousInstrument(
InstrumentType.VALUE_RECORDER, InstrumentValueType.DOUBLE, DoubleValueRecorderSdk::new);
InstrumentType.HISTOGRAM, InstrumentValueType.DOUBLE, DoubleHistogramSdk::new);
}
@Override
public LongHistogramBuilder ofLongs() {
return swapBuilder(LongValueRecorderSdk.Builder::new);
return swapBuilder(LongHistogramSdk.Builder::new);
}
}
}

View File

@ -1,16 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
final class DoubleSumObserverSdk extends AbstractAsynchronousInstrument {
DoubleSumObserverSdk(
InstrumentDescriptor descriptor, AsynchronousInstrumentAccumulator accumulator) {
super(descriptor, accumulator);
}
}

View File

@ -12,28 +12,26 @@ import io.opentelemetry.api.metrics.DoubleUpDownCounterBuilder;
import io.opentelemetry.api.metrics.LongUpDownCounterBuilder;
import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.metrics.aggregator.AggregatorHandle;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.internal.state.BoundStorageHandle;
import io.opentelemetry.sdk.metrics.internal.state.MeterProviderSharedState;
import io.opentelemetry.sdk.metrics.internal.state.MeterSharedState;
import io.opentelemetry.sdk.metrics.internal.state.WriteableMetricStorage;
import java.util.function.Consumer;
final class DoubleUpDownCounterSdk extends AbstractSynchronousInstrument
implements DoubleUpDownCounter {
final class DoubleUpDownCounterSdk extends AbstractInstrument implements DoubleUpDownCounter {
private final WriteableMetricStorage storage;
private DoubleUpDownCounterSdk(
InstrumentDescriptor descriptor, SynchronousInstrumentAccumulator<?> accumulator) {
super(descriptor, accumulator);
private DoubleUpDownCounterSdk(InstrumentDescriptor descriptor, WriteableMetricStorage storage) {
super(descriptor);
this.storage = storage;
}
@Override
public void add(double increment, Attributes labels, Context context) {
AggregatorHandle<?> aggregatorHandle = acquireHandle(labels);
try {
aggregatorHandle.recordDouble(increment);
} finally {
aggregatorHandle.release();
}
public void add(double increment, Attributes attributes, Context context) {
storage.recordDouble(increment, attributes, context);
}
@Override
@ -47,20 +45,22 @@ final class DoubleUpDownCounterSdk extends AbstractSynchronousInstrument
}
@Override
public BoundDoubleUpDownCounter bind(Attributes labels) {
return new BoundInstrument(acquireHandle(labels));
public BoundDoubleUpDownCounter bind(Attributes attributes) {
return new BoundInstrument(storage.bind(attributes), attributes);
}
static final class BoundInstrument implements BoundDoubleUpDownCounter {
private final AggregatorHandle<?> aggregatorHandle;
private final BoundStorageHandle handle;
private final Attributes attributes;
BoundInstrument(AggregatorHandle<?> aggregatorHandle) {
this.aggregatorHandle = aggregatorHandle;
BoundInstrument(BoundStorageHandle handle, Attributes attributes) {
this.handle = handle;
this.attributes = attributes;
}
@Override
public void add(double increment, Context context) {
aggregatorHandle.recordDouble(increment);
handle.recordDouble(increment, attributes, context);
}
@Override
@ -70,7 +70,7 @@ final class DoubleUpDownCounterSdk extends AbstractSynchronousInstrument
@Override
public void unbind() {
aggregatorHandle.release();
handle.release();
}
}
@ -111,8 +111,7 @@ final class DoubleUpDownCounterSdk extends AbstractSynchronousInstrument
@Override
public void buildWithCallback(Consumer<ObservableDoubleMeasurement> callback) {
buildDoubleAsynchronousInstrument(
InstrumentType.UP_DOWN_SUM_OBSERVER, callback, DoubleUpDownSumObserverSdk::new);
registerDoubleAsynchronousInstrument(InstrumentType.OBSERVABLE_UP_DOWN_SUM, callback);
}
}
}

View File

@ -1,16 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
final class DoubleUpDownSumObserverSdk extends AbstractAsynchronousInstrument {
DoubleUpDownSumObserverSdk(
InstrumentDescriptor descriptor, AsynchronousInstrumentAccumulator accumulator) {
super(descriptor, accumulator);
}
}

View File

@ -1,57 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics;
import io.opentelemetry.api.metrics.DoubleGaugeBuilder;
import io.opentelemetry.api.metrics.LongGaugeBuilder;
import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import java.util.function.Consumer;
final class DoubleValueObserverSdk extends AbstractAsynchronousInstrument {
DoubleValueObserverSdk(
InstrumentDescriptor descriptor, AsynchronousInstrumentAccumulator accumulator) {
super(descriptor, accumulator);
}
static final class Builder extends AbstractInstrumentBuilder<DoubleValueObserverSdk.Builder>
implements DoubleGaugeBuilder {
Builder(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState,
String name) {
this(meterProviderSharedState, meterSharedState, name, "", "1");
}
Builder(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState sharedState,
String name,
String description,
String unit) {
super(meterProviderSharedState, sharedState, name, description, unit);
}
@Override
protected Builder getThis() {
return this;
}
@Override
public LongGaugeBuilder ofLongs() {
return swapBuilder(LongValueObserverSdk.Builder::new);
}
@Override
public void buildWithCallback(Consumer<ObservableDoubleMeasurement> callback) {
buildDoubleAsynchronousInstrument(
InstrumentType.VALUE_OBSERVER, callback, DoubleValueObserverSdk::new);
}
}
}

View File

@ -1,56 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Basic registry class for metrics instruments. The current implementation allows instruments to be
* registered only once for a given name.
*
* <p>TODO: Discuss what is the right behavior when an already registered Instrument with the same
* name is present. TODO: Decide what is the identifier for an Instrument? Only name?
*/
final class InstrumentRegistry {
private final ConcurrentMap<String, AbstractInstrument> registry = new ConcurrentHashMap<>();
/**
* Registers the given {@code instrument} to this registry. Returns the registered instrument if
* no other instrument with the same name is registered or a previously registered instrument with
* same name and equal with the current instrument, otherwise throws an exception.
*
* @param instrument the newly created {@code Instrument}.
* @return the given instrument if no instrument with same name already registered, otherwise the
* previous registered instrument.
* @throws IllegalArgumentException if instrument cannot be registered.
*/
@SuppressWarnings("unchecked")
<I extends AbstractInstrument> I register(I instrument) {
AbstractInstrument oldInstrument =
registry.putIfAbsent(instrument.getDescriptor().getName().toLowerCase(), instrument);
if (oldInstrument != null) {
if (!instrument.getClass().isInstance(oldInstrument) || !instrument.equals(oldInstrument)) {
throw new IllegalArgumentException(
"Instrument with same name and different descriptor already created.");
}
return (I) oldInstrument;
}
return instrument;
}
/**
* Returns a {@code Collection} view of the registered instruments.
*
* @return a {@code Collection} view of the registered instruments.
*/
Collection<AbstractInstrument> getInstruments() {
return Collections.unmodifiableCollection(new ArrayList<>(registry.values()));
}
}

View File

@ -12,30 +12,29 @@ import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.LongCounterBuilder;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.metrics.aggregator.AggregatorHandle;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.internal.state.BoundStorageHandle;
import io.opentelemetry.sdk.metrics.internal.state.MeterProviderSharedState;
import io.opentelemetry.sdk.metrics.internal.state.MeterSharedState;
import io.opentelemetry.sdk.metrics.internal.state.WriteableMetricStorage;
import java.util.function.Consumer;
final class LongCounterSdk extends AbstractSynchronousInstrument implements LongCounter {
final class LongCounterSdk extends AbstractInstrument implements LongCounter {
private final WriteableMetricStorage storage;
private LongCounterSdk(
InstrumentDescriptor descriptor, SynchronousInstrumentAccumulator<?> accumulator) {
super(descriptor, accumulator);
private LongCounterSdk(InstrumentDescriptor descriptor, WriteableMetricStorage storage) {
super(descriptor);
this.storage = storage;
}
@Override
public void add(long increment, Attributes labels, Context context) {
AggregatorHandle<?> aggregatorHandle = acquireHandle(labels);
try {
if (increment < 0) {
throw new IllegalArgumentException("Counters can only increase");
}
aggregatorHandle.recordLong(increment);
} finally {
aggregatorHandle.release();
public void add(long increment, Attributes attributes, Context context) {
if (increment < 0) {
throw new IllegalArgumentException("Counters can only increase");
}
storage.recordLong(increment, attributes, context);
}
@Override
@ -49,15 +48,17 @@ final class LongCounterSdk extends AbstractSynchronousInstrument implements Long
}
@Override
public BoundLongCounter bind(Attributes labels) {
return new BoundInstrument(acquireHandle(labels));
public BoundLongCounter bind(Attributes attributes) {
return new BoundInstrument(storage.bind(attributes), attributes);
}
static final class BoundInstrument implements BoundLongCounter {
private final AggregatorHandle<?> aggregatorHandle;
private final BoundStorageHandle handle;
private final Attributes attributes;
BoundInstrument(AggregatorHandle<?> aggregatorHandle) {
this.aggregatorHandle = aggregatorHandle;
BoundInstrument(BoundStorageHandle handle, Attributes attributes) {
this.handle = handle;
this.attributes = attributes;
}
@Override
@ -65,7 +66,7 @@ final class LongCounterSdk extends AbstractSynchronousInstrument implements Long
if (increment < 0) {
throw new IllegalArgumentException("Counters can only increase");
}
aggregatorHandle.recordLong(increment);
handle.recordLong(increment, attributes, context);
}
@Override
@ -75,7 +76,7 @@ final class LongCounterSdk extends AbstractSynchronousInstrument implements Long
@Override
public void unbind() {
aggregatorHandle.release();
handle.release();
}
}
@ -116,8 +117,7 @@ final class LongCounterSdk extends AbstractSynchronousInstrument implements Long
@Override
public void buildWithCallback(Consumer<ObservableLongMeasurement> callback) {
buildLongAsynchronousInstrument(
InstrumentType.SUM_OBSERVER, callback, LongSumObserverSdk::new);
registerLongAsynchronousInstrument(InstrumentType.OBSERVABLE_SUM, callback);
}
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics;
import io.opentelemetry.api.metrics.DoubleGaugeBuilder;
import io.opentelemetry.api.metrics.LongGaugeBuilder;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.internal.state.MeterProviderSharedState;
import io.opentelemetry.sdk.metrics.internal.state.MeterSharedState;
import java.util.function.Consumer;
final class LongGaugeBuilderSdk extends AbstractInstrumentBuilder<LongGaugeBuilderSdk>
implements LongGaugeBuilder {
LongGaugeBuilderSdk(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState,
String name) {
this(meterProviderSharedState, meterSharedState, name, "", "1");
}
LongGaugeBuilderSdk(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState sharedState,
String name,
String description,
String unit) {
super(meterProviderSharedState, sharedState, name, description, unit);
}
@Override
protected LongGaugeBuilderSdk getThis() {
return this;
}
@Override
public DoubleGaugeBuilder ofDoubles() {
return swapBuilder(DoubleGaugeBuilderSdk::new);
}
@Override
public void buildWithCallback(Consumer<ObservableLongMeasurement> callback) {
registerLongAsynchronousInstrument(InstrumentType.OBSERVABLE_GAUGE, callback);
}
}

View File

@ -11,26 +11,25 @@ import io.opentelemetry.api.metrics.DoubleHistogramBuilder;
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.LongHistogramBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.metrics.aggregator.AggregatorHandle;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.internal.state.BoundStorageHandle;
import io.opentelemetry.sdk.metrics.internal.state.MeterProviderSharedState;
import io.opentelemetry.sdk.metrics.internal.state.MeterSharedState;
import io.opentelemetry.sdk.metrics.internal.state.WriteableMetricStorage;
final class LongValueRecorderSdk extends AbstractSynchronousInstrument implements LongHistogram {
final class LongHistogramSdk extends AbstractInstrument implements LongHistogram {
private final WriteableMetricStorage storage;
private LongValueRecorderSdk(
InstrumentDescriptor descriptor, SynchronousInstrumentAccumulator<?> accumulator) {
super(descriptor, accumulator);
private LongHistogramSdk(InstrumentDescriptor descriptor, WriteableMetricStorage storage) {
super(descriptor);
this.storage = storage;
}
@Override
public void record(long value, Attributes attributes, Context context) {
AggregatorHandle<?> aggregatorHandle = acquireHandle(attributes);
try {
aggregatorHandle.recordLong(value);
} finally {
aggregatorHandle.release();
}
storage.recordLong(value, attributes, context);
}
@Override
@ -45,19 +44,21 @@ final class LongValueRecorderSdk extends AbstractSynchronousInstrument implement
@Override
public BoundLongHistogram bind(Attributes attributes) {
return new BoundInstrument(acquireHandle(attributes));
return new BoundInstrument(storage.bind(attributes), attributes);
}
static final class BoundInstrument implements BoundLongHistogram {
private final AggregatorHandle<?> aggregatorHandle;
private final BoundStorageHandle handle;
private final Attributes attributes;
BoundInstrument(AggregatorHandle<?> aggregatorHandle) {
this.aggregatorHandle = aggregatorHandle;
BoundInstrument(BoundStorageHandle handle, Attributes attributes) {
this.handle = handle;
this.attributes = attributes;
}
@Override
public void record(long value, Context context) {
aggregatorHandle.recordLong(value);
handle.recordLong(value, attributes, context);
}
@Override
@ -67,11 +68,11 @@ final class LongValueRecorderSdk extends AbstractSynchronousInstrument implement
@Override
public void unbind() {
aggregatorHandle.release();
handle.release();
}
}
static final class Builder extends AbstractInstrumentBuilder<LongValueRecorderSdk.Builder>
static final class Builder extends AbstractInstrumentBuilder<LongHistogramSdk.Builder>
implements LongHistogramBuilder {
Builder(
@ -96,14 +97,14 @@ final class LongValueRecorderSdk extends AbstractSynchronousInstrument implement
}
@Override
public LongValueRecorderSdk build() {
public LongHistogramSdk build() {
return buildSynchronousInstrument(
InstrumentType.VALUE_RECORDER, InstrumentValueType.LONG, LongValueRecorderSdk::new);
InstrumentType.HISTOGRAM, InstrumentValueType.LONG, LongHistogramSdk::new);
}
@Override
public DoubleHistogramBuilder ofDoubles() {
return swapBuilder(DoubleValueRecorderSdk.Builder::new);
return swapBuilder(DoubleHistogramSdk.Builder::new);
}
}
}

View File

@ -1,16 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
final class LongSumObserverSdk extends AbstractAsynchronousInstrument {
LongSumObserverSdk(
InstrumentDescriptor descriptor, AsynchronousInstrumentAccumulator accumulator) {
super(descriptor, accumulator);
}
}

View File

@ -12,28 +12,26 @@ import io.opentelemetry.api.metrics.LongUpDownCounter;
import io.opentelemetry.api.metrics.LongUpDownCounterBuilder;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.metrics.aggregator.AggregatorHandle;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.internal.state.BoundStorageHandle;
import io.opentelemetry.sdk.metrics.internal.state.MeterProviderSharedState;
import io.opentelemetry.sdk.metrics.internal.state.MeterSharedState;
import io.opentelemetry.sdk.metrics.internal.state.WriteableMetricStorage;
import java.util.function.Consumer;
final class LongUpDownCounterSdk extends AbstractSynchronousInstrument
implements LongUpDownCounter {
final class LongUpDownCounterSdk extends AbstractInstrument implements LongUpDownCounter {
private final WriteableMetricStorage storage;
private LongUpDownCounterSdk(
InstrumentDescriptor descriptor, SynchronousInstrumentAccumulator<?> accumulator) {
super(descriptor, accumulator);
private LongUpDownCounterSdk(InstrumentDescriptor descriptor, WriteableMetricStorage storage) {
super(descriptor);
this.storage = storage;
}
@Override
public void add(long increment, Attributes labels, Context context) {
AggregatorHandle<?> aggregatorHandle = acquireHandle(labels);
try {
aggregatorHandle.recordLong(increment);
} finally {
aggregatorHandle.release();
}
public void add(long increment, Attributes attributes, Context context) {
storage.recordLong(increment, attributes, context);
}
@Override
@ -47,20 +45,22 @@ final class LongUpDownCounterSdk extends AbstractSynchronousInstrument
}
@Override
public BoundLongUpDownCounter bind(Attributes labels) {
return new BoundInstrument(acquireHandle(labels));
public BoundLongUpDownCounter bind(Attributes attributes) {
return new BoundInstrument(storage.bind(attributes), attributes);
}
static final class BoundInstrument implements BoundLongUpDownCounter {
private final AggregatorHandle<?> aggregatorHandle;
private final BoundStorageHandle handle;
private final Attributes attributes;
BoundInstrument(AggregatorHandle<?> aggregatorHandle) {
this.aggregatorHandle = aggregatorHandle;
BoundInstrument(BoundStorageHandle handle, Attributes attributes) {
this.handle = handle;
this.attributes = attributes;
}
@Override
public void add(long increment, Context context) {
aggregatorHandle.recordLong(increment);
handle.recordLong(increment, attributes, context);
}
@Override
@ -70,7 +70,7 @@ final class LongUpDownCounterSdk extends AbstractSynchronousInstrument
@Override
public void unbind() {
aggregatorHandle.release();
handle.release();
}
}
@ -111,8 +111,7 @@ final class LongUpDownCounterSdk extends AbstractSynchronousInstrument
@Override
public void buildWithCallback(Consumer<ObservableLongMeasurement> callback) {
buildLongAsynchronousInstrument(
InstrumentType.UP_DOWN_SUM_OBSERVER, callback, LongUpDownSumObserverSdk::new);
registerLongAsynchronousInstrument(InstrumentType.OBSERVABLE_UP_DOWN_SUM, callback);
}
}
}

View File

@ -1,16 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
final class LongUpDownSumObserverSdk extends AbstractAsynchronousInstrument {
LongUpDownSumObserverSdk(
InstrumentDescriptor descriptor, AsynchronousInstrumentAccumulator accumulator) {
super(descriptor, accumulator);
}
}

View File

@ -1,57 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics;
import io.opentelemetry.api.metrics.DoubleGaugeBuilder;
import io.opentelemetry.api.metrics.LongGaugeBuilder;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import java.util.function.Consumer;
final class LongValueObserverSdk extends AbstractAsynchronousInstrument {
LongValueObserverSdk(
InstrumentDescriptor descriptor, AsynchronousInstrumentAccumulator accumulator) {
super(descriptor, accumulator);
}
static final class Builder extends AbstractInstrumentBuilder<LongValueObserverSdk.Builder>
implements LongGaugeBuilder {
Builder(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState,
String name) {
this(meterProviderSharedState, meterSharedState, name, "", "1");
}
Builder(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState sharedState,
String name,
String description,
String unit) {
super(meterProviderSharedState, sharedState, name, description, unit);
}
@Override
protected Builder getThis() {
return this;
}
@Override
public DoubleGaugeBuilder ofDoubles() {
return swapBuilder(DoubleValueObserverSdk.Builder::new);
}
@Override
public void buildWithCallback(Consumer<ObservableLongMeasurement> callback) {
buildLongAsynchronousInstrument(
InstrumentType.VALUE_OBSERVER, callback, LongValueObserverSdk::new);
}
}
}

View File

@ -1,28 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics;
import com.google.auto.value.AutoValue;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.resources.Resource;
import javax.annotation.concurrent.Immutable;
@AutoValue
@Immutable
abstract class MeterProviderSharedState {
static MeterProviderSharedState create(
Clock clock, Resource resource, ViewRegistry viewRegistry) {
return new AutoValue_MeterProviderSharedState(clock, resource, viewRegistry, clock.now());
}
abstract Clock getClock();
abstract Resource getResource();
abstract ViewRegistry getViewRegistry();
abstract long getStartEpochNanos();
}

View File

@ -1,22 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics;
import com.google.auto.value.AutoValue;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import javax.annotation.concurrent.Immutable;
@AutoValue
@Immutable
abstract class MeterSharedState {
static MeterSharedState create(InstrumentationLibraryInfo instrumentationLibraryInfo) {
return new AutoValue_MeterSharedState(instrumentationLibraryInfo, new InstrumentRegistry());
}
abstract InstrumentationLibraryInfo getInstrumentationLibraryInfo();
abstract InstrumentRegistry getInstrumentRegistry();
}

View File

@ -12,9 +12,9 @@ import io.opentelemetry.api.metrics.LongUpDownCounterBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.data.MetricData;
import java.util.ArrayList;
import io.opentelemetry.sdk.metrics.internal.state.MeterProviderSharedState;
import io.opentelemetry.sdk.metrics.internal.state.MeterSharedState;
import java.util.Collection;
import java.util.List;
/** {@link SdkMeter} is SDK implementation of {@link Meter}. */
final class SdkMeter implements Meter {
@ -28,19 +28,14 @@ final class SdkMeter implements Meter {
this.meterSharedState = MeterSharedState.create(instrumentationLibraryInfo);
}
// Only used in testing....
InstrumentationLibraryInfo getInstrumentationLibraryInfo() {
return meterSharedState.getInstrumentationLibraryInfo();
}
/** Collects all the metric recordings that changed since the previous call. */
Collection<MetricData> collectAll(long epochNanos) {
InstrumentRegistry instrumentRegistry = meterSharedState.getInstrumentRegistry();
Collection<AbstractInstrument> instruments = instrumentRegistry.getInstruments();
List<MetricData> result = new ArrayList<>(instruments.size());
for (AbstractInstrument instrument : instruments) {
result.addAll(instrument.collectAll(epochNanos));
}
return result;
return meterSharedState.collectAll(meterProviderSharedState, epochNanos);
}
@Override
@ -55,11 +50,11 @@ final class SdkMeter implements Meter {
@Override
public DoubleHistogramBuilder histogramBuilder(String name) {
return new DoubleValueRecorderSdk.Builder(meterProviderSharedState, meterSharedState, name);
return new DoubleHistogramSdk.Builder(meterProviderSharedState, meterSharedState, name);
}
@Override
public DoubleGaugeBuilder gaugeBuilder(String name) {
return new DoubleValueObserverSdk.Builder(meterProviderSharedState, meterSharedState, name);
return new DoubleGaugeBuilderSdk(meterProviderSharedState, meterSharedState, name);
}
}

View File

@ -12,6 +12,8 @@ import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.internal.ComponentRegistry;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricProducer;
import io.opentelemetry.sdk.metrics.internal.state.MeterProviderSharedState;
import io.opentelemetry.sdk.metrics.internal.view.ViewRegistry;
import io.opentelemetry.sdk.resources.Resource;
import java.util.ArrayList;
import java.util.Collection;

View File

@ -7,6 +7,8 @@ package io.opentelemetry.sdk.metrics;
import io.opentelemetry.api.metrics.GlobalMeterProvider;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.metrics.internal.view.ViewRegistry;
import io.opentelemetry.sdk.metrics.internal.view.ViewRegistryBuilder;
import io.opentelemetry.sdk.metrics.view.InstrumentSelector;
import io.opentelemetry.sdk.metrics.view.View;
import io.opentelemetry.sdk.resources.Resource;

View File

@ -27,17 +27,17 @@ abstract class AbstractSumAggregator<T> extends AbstractAggregator<T> {
instrumentDescriptor,
resolveStateful(instrumentDescriptor.getType(), temporality));
InstrumentType type = instrumentDescriptor.getType();
this.isMonotonic = type == InstrumentType.COUNTER || type == InstrumentType.SUM_OBSERVER;
this.isMonotonic = type == InstrumentType.COUNTER || type == InstrumentType.OBSERVABLE_SUM;
this.temporality = temporality;
this.mergeStrategy = resolveMergeStrategy(type, temporality);
}
/**
* Resolve whether the aggregator should be stateful. For the special case {@link
* InstrumentType#SUM_OBSERVER} and {@link InstrumentType#UP_DOWN_SUM_OBSERVER} instruments, state
* is required if temporality is {@link AggregationTemporality#DELTA}. Because the observed values
* are cumulative sums, we must maintain state to compute delta sums between collections. For
* other instruments, state is required if temporality is {@link
* InstrumentType#OBSERVABLE_SUM} and {@link InstrumentType#OBSERVABLE_UP_DOWN_SUM} instruments,
* state is required if temporality is {@link AggregationTemporality#DELTA}. Because the observed
* values are cumulative sums, we must maintain state to compute delta sums between collections.
* For other instruments, state is required if temporality is {@link
* AggregationTemporality#CUMULATIVE}.
*
* @param instrumentType the instrument type
@ -46,8 +46,8 @@ abstract class AbstractSumAggregator<T> extends AbstractAggregator<T> {
*/
private static boolean resolveStateful(
InstrumentType instrumentType, AggregationTemporality temporality) {
if (instrumentType == InstrumentType.SUM_OBSERVER
|| instrumentType == InstrumentType.UP_DOWN_SUM_OBSERVER) {
if (instrumentType == InstrumentType.OBSERVABLE_SUM
|| instrumentType == InstrumentType.OBSERVABLE_UP_DOWN_SUM) {
return temporality == AggregationTemporality.DELTA;
} else {
return temporality == AggregationTemporality.CUMULATIVE;
@ -57,8 +57,9 @@ abstract class AbstractSumAggregator<T> extends AbstractAggregator<T> {
/**
* Resolve the aggregator merge strategy. The merge strategy is SUM in all cases except where
* temporality is {@link AggregationTemporality#DELTA} and instrument type is {@link
* InstrumentType#SUM_OBSERVER} or {@link InstrumentType#UP_DOWN_SUM_OBSERVER}. In these special
* cases, the observed values are cumulative sums so we must take a diff to compute the delta sum.
* InstrumentType#OBSERVABLE_SUM} or {@link InstrumentType#OBSERVABLE_UP_DOWN_SUM}. In these
* special cases, the observed values are cumulative sums so we must take a diff to compute the
* delta sum.
*
* @param instrumentType the instrument type
* @param temporality the temporality
@ -67,8 +68,8 @@ abstract class AbstractSumAggregator<T> extends AbstractAggregator<T> {
// Visible for testing
static MergeStrategy resolveMergeStrategy(
InstrumentType instrumentType, AggregationTemporality temporality) {
if ((instrumentType == InstrumentType.SUM_OBSERVER
|| instrumentType == InstrumentType.UP_DOWN_SUM_OBSERVER)
if ((instrumentType == InstrumentType.OBSERVABLE_SUM
|| instrumentType == InstrumentType.OBSERVABLE_UP_DOWN_SUM)
&& temporality == AggregationTemporality.DELTA) {
return MergeStrategy.DIFF;
} else {

View File

@ -5,6 +5,9 @@
package io.opentelemetry.sdk.metrics.aggregator;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.metrics.internal.state.BoundStorageHandle;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
@ -23,7 +26,7 @@ import javax.annotation.concurrent.ThreadSafe;
* of the bits are used for reference (usage) counting.
*/
@ThreadSafe
public abstract class AggregatorHandle<T> {
public abstract class AggregatorHandle<T> implements BoundStorageHandle {
// Atomically counts the number of references (usages) while also keeping a state of
// mapped/unmapped into a registry map.
private final AtomicLong refCountMapped;
@ -50,6 +53,7 @@ public abstract class AggregatorHandle<T> {
}
/** Release this {@code Aggregator}. It decreases the reference usage. */
@Override
public final void release() {
// Every reference adds/removes 2 instead of 1 to avoid changing the mapping bit.
refCountMapped.getAndAdd(-2L);
@ -96,6 +100,11 @@ public abstract class AggregatorHandle<T> {
hasRecordings = true;
}
@Override
public final void recordLong(long value, Attributes attributes, Context context) {
recordLong(value);
}
/**
* Concrete Aggregator instances should implement this method in order support recordings of long
* values.
@ -115,6 +124,11 @@ public abstract class AggregatorHandle<T> {
hasRecordings = true;
}
@Override
public final void recordDouble(double value, Attributes attributes, Context context) {
recordDouble(value);
}
/**
* Concrete Aggregator instances should implement this method in order support recordings of
* double values.

View File

@ -9,8 +9,8 @@ package io.opentelemetry.sdk.metrics.common;
public enum InstrumentType {
COUNTER,
UP_DOWN_COUNTER,
VALUE_RECORDER,
SUM_OBSERVER,
UP_DOWN_SUM_OBSERVER,
VALUE_OBSERVER,
HISTOGRAM,
OBSERVABLE_SUM,
OBSERVABLE_UP_DOWN_SUM,
OBSERVABLE_GAUGE,
}

View File

@ -0,0 +1,42 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics.internal.descriptor;
import com.google.auto.value.AutoValue;
import com.google.auto.value.extension.memoized.Memoized;
import javax.annotation.concurrent.Immutable;
/**
* Describes a metric that will be output.
*
* <p>Provides equality/identity semantics for detecting duplicate metrics of incompatible.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
@AutoValue
@Immutable
public abstract class MetricDescriptor {
public static MetricDescriptor create(String name, String description, String unit) {
return new AutoValue_MetricDescriptor(name, description, unit);
}
public abstract String getName();
public abstract String getDescription();
public abstract String getUnit();
@Memoized
@Override
public abstract int hashCode();
public boolean isCompatibleWith(MetricDescriptor other) {
// TODO: implement.
return equals(other);
}
}

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics;
package io.opentelemetry.sdk.metrics.internal.state;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
@ -12,28 +12,35 @@ import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.metrics.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.processor.LabelsProcessor;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
final class AsynchronousInstrumentAccumulator extends AbstractAccumulator {
/**
* Stores aggregated {@link MetricData} for asynchronous instruments.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class AsynchronousMetricStorage implements MetricStorage {
private final MetricDescriptor metricDescriptor;
private final ReentrantLock collectLock = new ReentrantLock();
private final InstrumentProcessor<?> instrumentProcessor;
private final Runnable metricUpdater;
static <T> AsynchronousInstrumentAccumulator doubleAsynchronousAccumulator(
/** Constructs storage for {@code double} valued instruments. */
public static <T> AsynchronousMetricStorage doubleAsynchronousAccumulator(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState,
InstrumentDescriptor descriptor,
Consumer<ObservableDoubleMeasurement> metricUpdater) {
Aggregator<T> aggregator =
getAggregator(meterProviderSharedState, meterSharedState, descriptor);
Aggregator<T> aggregator = meterProviderSharedState.getAggregator(meterSharedState, descriptor);
final InstrumentProcessor<T> instrumentProcessor =
new InstrumentProcessor<>(aggregator, meterProviderSharedState.getStartEpochNanos());
final LabelsProcessor labelsProcessor =
getLabelsProcessor(meterProviderSharedState, meterSharedState, descriptor);
meterProviderSharedState.getLabelsProcessor(meterSharedState, descriptor);
final ObservableDoubleMeasurement result =
new ObservableDoubleMeasurement() {
@ -50,22 +57,26 @@ final class AsynchronousInstrumentAccumulator extends AbstractAccumulator {
}
};
return new AsynchronousInstrumentAccumulator(
instrumentProcessor, () -> metricUpdater.accept(result));
return new AsynchronousMetricStorage(
// TODO: View can change metric name/description. Update this when wired in.
MetricDescriptor.create(
descriptor.getName(), descriptor.getDescription(), descriptor.getUnit()),
instrumentProcessor,
() -> metricUpdater.accept(result));
}
static <T> AsynchronousInstrumentAccumulator longAsynchronousAccumulator(
/** Constructs storage for {@code long} valued instruments. */
public static <T> AsynchronousMetricStorage longAsynchronousAccumulator(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState,
InstrumentDescriptor descriptor,
Consumer<ObservableLongMeasurement> metricUpdater) {
Aggregator<T> aggregator =
getAggregator(meterProviderSharedState, meterSharedState, descriptor);
Aggregator<T> aggregator = meterProviderSharedState.getAggregator(meterSharedState, descriptor);
final InstrumentProcessor<T> instrumentProcessor =
new InstrumentProcessor<>(aggregator, meterProviderSharedState.getStartEpochNanos());
final LabelsProcessor labelsProcessor =
getLabelsProcessor(meterProviderSharedState, meterSharedState, descriptor);
meterProviderSharedState.getLabelsProcessor(meterSharedState, descriptor);
final ObservableLongMeasurement result =
new ObservableLongMeasurement() {
@ -81,18 +92,25 @@ final class AsynchronousInstrumentAccumulator extends AbstractAccumulator {
observe(value, Attributes.empty());
}
};
return new AsynchronousInstrumentAccumulator(
instrumentProcessor, () -> metricUpdater.accept(result));
return new AsynchronousMetricStorage(
// TODO: View can change metric name/description. Update this when wired in.
MetricDescriptor.create(
descriptor.getName(), descriptor.getDescription(), descriptor.getUnit()),
instrumentProcessor,
() -> metricUpdater.accept(result));
}
private AsynchronousInstrumentAccumulator(
InstrumentProcessor<?> instrumentProcessor, Runnable metricUpdater) {
private AsynchronousMetricStorage(
MetricDescriptor metricDescriptor,
InstrumentProcessor<?> instrumentProcessor,
Runnable metricUpdater) {
this.metricDescriptor = metricDescriptor;
this.instrumentProcessor = instrumentProcessor;
this.metricUpdater = metricUpdater;
}
@Override
List<MetricData> collectAll(long epochNanos) {
public MetricData collectAndReset(long startEpochNanos, long epochNanos) {
collectLock.lock();
try {
metricUpdater.run();
@ -101,4 +119,9 @@ final class AsynchronousInstrumentAccumulator extends AbstractAccumulator {
collectLock.unlock();
}
}
@Override
public MetricDescriptor getMetricDescriptor() {
return metricDescriptor;
}
}

View File

@ -0,0 +1,24 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics.internal.state;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
/**
* A bound handle for recording measurements against a particular set of attributes.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public interface BoundStorageHandle {
/** Records a measurement. */
void recordLong(long value, Attributes attributes, Context context);
/** Records a measurement. */
void recordDouble(double value, Attributes attributes, Context context);
/** Release this handle back to the storage. */
void release();
}

View File

@ -3,23 +3,24 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics;
package io.opentelemetry.sdk.metrics.internal.state;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.metrics.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.data.MetricData;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* An {@code InstrumentProcessor} represents an internal instance of an {@code Accumulator} for a
* specific {code Instrument}. It records individual measurements (via the {@code Aggregator}). It
* batches together {@code Aggregator}s for the similar sets of labels.
* batches together {@code Aggregator}s for the similar sets of attributes.
*
* <p>An entire collection cycle must be protected by a lock. A collection cycle is defined by
* multiple calls to {@code #batch(...)} followed by one {@code #completeCollectionCycle(...)};
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
final class InstrumentProcessor<T> {
private final Aggregator<T> aggregator;
@ -35,19 +36,17 @@ final class InstrumentProcessor<T> {
}
/**
* Batches multiple entries together that are part of the same metric. It may remove labels from
* the {@link Labels} and merge aggregations together.
* Batches multiple entries together that are part of the same metric. It may remove attributes
* from the {@link Attributes} and merge aggregations together.
*
* @param labelSet the {@link Labels} associated with this {@code Aggregator}.
* @param attributes the {@link Attributes} associated with this {@code Aggregator}.
* @param accumulation the accumulation produced by this instrument.
*/
void batch(Attributes labelSet, T accumulation) {
T currentAccumulation = accumulationMap.get(labelSet);
if (currentAccumulation == null) {
accumulationMap.put(labelSet, accumulation);
return;
void batch(Attributes attributes, T accumulation) {
T currentAccumulation = accumulationMap.putIfAbsent(attributes, accumulation);
if (currentAccumulation != null) {
accumulationMap.put(attributes, aggregator.merge(currentAccumulation, accumulation));
}
accumulationMap.put(labelSet, aggregator.merge(currentAccumulation, accumulation));
}
/**
@ -58,11 +57,11 @@ final class InstrumentProcessor<T> {
* <p>Based on the configured options this method may reset the internal state to produce deltas,
* or keep the internal state to produce cumulative metrics.
*
* @return the list of metrics batched in this Batcher.
* @return the metric batched or {@code null}.
*/
List<MetricData> completeCollectionCycle(long epochNanos) {
MetricData completeCollectionCycle(long epochNanos) {
if (accumulationMap.isEmpty()) {
return Collections.emptyList();
return null;
}
MetricData metricData =
@ -73,6 +72,6 @@ final class InstrumentProcessor<T> {
accumulationMap = new HashMap<>();
}
return metricData == null ? Collections.emptyList() : Collections.singletonList(metricData);
return metricData;
}
}

View File

@ -0,0 +1,63 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics.internal.state;
import com.google.auto.value.AutoValue;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.metrics.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.internal.view.ViewRegistry;
import io.opentelemetry.sdk.metrics.processor.LabelsProcessor;
import io.opentelemetry.sdk.resources.Resource;
import javax.annotation.concurrent.Immutable;
/**
* State for a {@code MeterProvider}.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
@AutoValue
@Immutable
public abstract class MeterProviderSharedState {
public static MeterProviderSharedState create(
Clock clock, Resource resource, ViewRegistry viewRegistry) {
return new AutoValue_MeterProviderSharedState(clock, resource, viewRegistry, clock.now());
}
/** Returns the clock used for measurements. */
public abstract Clock getClock();
/** Returns the {@link Resource} to attach telemetry to. */
abstract Resource getResource();
/** Returns the {@link ViewRegistry} for custom aggregation and metric definitions. */
abstract ViewRegistry getViewRegistry();
/**
* Returns the timestamp when this {@code MeterProvider} was started, in nanoseconds since Unix
* epoch time.
*/
abstract long getStartEpochNanos();
/** Returns the {@link Aggregator} to use for a given instrument. */
public <T> Aggregator<T> getAggregator(
MeterSharedState meterSharedState, InstrumentDescriptor descriptor) {
return getViewRegistry()
.findView(descriptor)
.getAggregatorFactory()
.create(getResource(), meterSharedState.getInstrumentationLibraryInfo(), descriptor);
}
/** Returns the {@link LabelsProcessor} to use for a given instrument. */
public LabelsProcessor getLabelsProcessor(
MeterSharedState meterSharedState, InstrumentDescriptor descriptor) {
return getViewRegistry()
.findView(descriptor)
.getLabelsProcessorFactory()
.create(getResource(), meterSharedState.getInstrumentationLibraryInfo(), descriptor);
}
}

View File

@ -0,0 +1,84 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics.internal.state;
import com.google.auto.value.AutoValue;
import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.data.MetricData;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;
import javax.annotation.concurrent.Immutable;
/**
* State for a {@code Meter}.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
@AutoValue
@Immutable
public abstract class MeterSharedState {
public static MeterSharedState create(InstrumentationLibraryInfo instrumentationLibraryInfo) {
return new AutoValue_MeterSharedState(instrumentationLibraryInfo, new MetricStorageRegistry());
}
// only visible for testing.
/** Returns the {@link InstrumentationLibraryInfo} for this {@code Meter}. */
public abstract InstrumentationLibraryInfo getInstrumentationLibraryInfo();
/** Returns the metric storage for metrics in this {@code Meter}. */
abstract MetricStorageRegistry getMetricStorageRegistry();
/** Collects all accumulated metric stream points. */
public List<MetricData> collectAll(
MeterProviderSharedState meterProviderSharedState, long epochNanos) {
Collection<MetricStorage> metrics = getMetricStorageRegistry().getMetrics();
List<MetricData> result = new ArrayList<>(metrics.size());
for (MetricStorage metric : metrics) {
MetricData current =
metric.collectAndReset(meterProviderSharedState.getStartEpochNanos(), epochNanos);
if (current != null) {
result.add(current);
}
}
return result;
}
/** Registers new synchronous storage associated with a given instrument. */
public final WriteableMetricStorage registerSynchronousMetricStorage(
InstrumentDescriptor instrument, MeterProviderSharedState meterProviderSharedState) {
return getMetricStorageRegistry()
.register(SynchronousMetricStorage.create(meterProviderSharedState, this, instrument));
}
/** Registers new asynchronous storage associated with a given {@code long} instrument. */
public final MetricStorage registerLongAsynchronousInstrument(
InstrumentDescriptor instrument,
MeterProviderSharedState meterProviderSharedState,
Consumer<ObservableLongMeasurement> metricUpdater) {
return getMetricStorageRegistry()
.register(
AsynchronousMetricStorage.longAsynchronousAccumulator(
meterProviderSharedState, this, instrument, metricUpdater));
}
/** Registers new asynchronous storage associated with a given {@code double} instrument. */
public final MetricStorage registerDoubleAsynchronousInstrument(
InstrumentDescriptor instrument,
MeterProviderSharedState meterProviderSharedState,
Consumer<ObservableDoubleMeasurement> metricUpdater) {
return getMetricStorageRegistry()
.register(
AsynchronousMetricStorage.doubleAsynchronousAccumulator(
meterProviderSharedState, this, instrument, metricUpdater));
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics.internal.state;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
/**
* Stores collected {@link MetricData}.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public interface MetricStorage {
/** Returns a description of the metric produced in this storage. */
MetricDescriptor getMetricDescriptor();
/**
* Collects the metrics from this storage and resets for the next collection period.
*
* @param startEpochNanos The start timestamp for this SDK.
* @param epochNanos The timestamp for this collection.
* @return The {@link MetricData} from this collection period, or {@code null}.
*/
MetricData collectAndReset(long startEpochNanos, long epochNanos);
}

View File

@ -0,0 +1,79 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics.internal.state;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Responsible for storing metrics (by name) and returning access to input pipeline for instrument
* wiring.
*
* <p>The rules of the registry:
*
* <ul>
* <li>Only one storage type may be registered per-name. Repeated look-ups per-name will return
* the same storage.
* <li>The metric descriptor should be "compatible", when returning an existing metric storage,
* i.e. same type of metric, same name, description etc.
* <li>The registered storage type MUST be either always Asynchronous or always Synchronous. No
* mixing and matching.
* </ul>
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public class MetricStorageRegistry {
private final ConcurrentMap<String, MetricStorage> registry = new ConcurrentHashMap<>();
/**
* Returns a {@code Collection} view of the registered {@link MetricStorage}.
*
* @return a {@code Collection} view of the registered {@link MetricStorage}.
*/
public Collection<MetricStorage> getMetrics() {
return Collections.unmodifiableCollection(new ArrayList<>(registry.values()));
}
/**
* Registers the given {@code Metric} to this registry. Returns the registered storage if no other
* metric with the same name is registered or a previously registered metric with same name and
* equal with the current metric, otherwise throws an exception.
*
* @param storage the metric storage to use or discard.
* @return the given metric storage if no metric with same name already registered, otherwise the
* previous registered instrument.
* @throws IllegalArgumentException if instrument cannot be registered.
*/
@SuppressWarnings("unchecked")
public <I extends MetricStorage> I register(I storage) {
MetricDescriptor descriptor = storage.getMetricDescriptor();
MetricStorage oldOrNewStorage =
registry.computeIfAbsent(descriptor.getName().toLowerCase(), key -> storage);
// Make sure the storage is compatible.
if (!oldOrNewStorage.getMetricDescriptor().isCompatibleWith(descriptor)) {
throw new IllegalArgumentException(
"Metric with same name and different descriptor already created. Found: "
+ oldOrNewStorage.getMetricDescriptor()
+ ", Want: "
+ descriptor);
}
// Make sure we aren't mixing sync + async.
if (!storage.getClass().equals(oldOrNewStorage.getClass())) {
throw new IllegalArgumentException(
"Metric with same name and different instrument already created. Found: "
+ oldOrNewStorage.getClass()
+ ", Want: "
+ storage.getClass());
}
return (I) oldOrNewStorage;
}
}

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics;
package io.opentelemetry.sdk.metrics.internal.state;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
@ -11,36 +11,48 @@ import io.opentelemetry.sdk.metrics.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.aggregator.AggregatorHandle;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.processor.LabelsProcessor;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
final class SynchronousInstrumentAccumulator<T> extends AbstractAccumulator {
/**
* Stores aggregated {@link MetricData} for synchronous instruments.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class SynchronousMetricStorage<T> implements WriteableMetricStorage {
private final MetricDescriptor metricDescriptor;
private final ConcurrentHashMap<Attributes, AggregatorHandle<T>> aggregatorLabels;
private final ReentrantLock collectLock;
private final Aggregator<T> aggregator;
private final InstrumentProcessor<T> instrumentProcessor;
private final LabelsProcessor labelsProcessor;
static <T> SynchronousInstrumentAccumulator<T> create(
/** Constructs metric storage for a given synchronous instrument. */
public static <T> SynchronousMetricStorage<T> create(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState,
InstrumentDescriptor descriptor) {
Aggregator<T> aggregator =
getAggregator(meterProviderSharedState, meterSharedState, descriptor);
return new SynchronousInstrumentAccumulator<>(
Aggregator<T> aggregator = meterProviderSharedState.getAggregator(meterSharedState, descriptor);
return new SynchronousMetricStorage<>(
// TODO: View can change metric name/description. Update this when wired in.
MetricDescriptor.create(
descriptor.getName(), descriptor.getDescription(), descriptor.getUnit()),
aggregator,
new InstrumentProcessor<>(aggregator, meterProviderSharedState.getStartEpochNanos()),
getLabelsProcessor(meterProviderSharedState, meterSharedState, descriptor));
meterProviderSharedState.getLabelsProcessor(meterSharedState, descriptor));
}
SynchronousInstrumentAccumulator(
SynchronousMetricStorage(
MetricDescriptor metricDescriptor,
Aggregator<T> aggregator,
InstrumentProcessor<T> instrumentProcessor,
LabelsProcessor labelsProcessor) {
this.metricDescriptor = metricDescriptor;
aggregatorLabels = new ConcurrentHashMap<>();
collectLock = new ReentrantLock();
this.aggregator = aggregator;
@ -48,10 +60,11 @@ final class SynchronousInstrumentAccumulator<T> extends AbstractAccumulator {
this.labelsProcessor = labelsProcessor;
}
AggregatorHandle<?> bind(Attributes labels) {
Objects.requireNonNull(labels, "labels");
labels = labelsProcessor.onLabelsBound(Context.current(), labels);
AggregatorHandle<T> aggregatorHandle = aggregatorLabels.get(labels);
@Override
public BoundStorageHandle bind(Attributes attributes) {
Objects.requireNonNull(attributes, "attributes");
attributes = labelsProcessor.onLabelsBound(Context.current(), attributes);
AggregatorHandle<T> aggregatorHandle = aggregatorLabels.get(attributes);
if (aggregatorHandle != null && aggregatorHandle.acquire()) {
// At this moment it is guaranteed that the Bound is in the map and will not be removed.
return aggregatorHandle;
@ -61,7 +74,7 @@ final class SynchronousInstrumentAccumulator<T> extends AbstractAccumulator {
aggregatorHandle = aggregator.createHandle();
while (true) {
AggregatorHandle<?> boundAggregatorHandle =
aggregatorLabels.putIfAbsent(labels, aggregatorHandle);
aggregatorLabels.putIfAbsent(attributes, aggregatorHandle);
if (boundAggregatorHandle != null) {
if (boundAggregatorHandle.acquire()) {
// At this moment it is guaranteed that the Bound is in the map and will not be removed.
@ -69,7 +82,7 @@ final class SynchronousInstrumentAccumulator<T> extends AbstractAccumulator {
}
// Try to remove the boundAggregator. This will race with the collect method, but only one
// will succeed.
aggregatorLabels.remove(labels, boundAggregatorHandle);
aggregatorLabels.remove(attributes, boundAggregatorHandle);
continue;
}
return aggregatorHandle;
@ -77,7 +90,7 @@ final class SynchronousInstrumentAccumulator<T> extends AbstractAccumulator {
}
@Override
List<MetricData> collectAll(long epochNanos) {
public MetricData collectAndReset(long startEpochNanos, long epochNanos) {
collectLock.lock();
try {
for (Map.Entry<Attributes, AggregatorHandle<T>> entry : aggregatorLabels.entrySet()) {
@ -98,4 +111,9 @@ final class SynchronousInstrumentAccumulator<T> extends AbstractAccumulator {
collectLock.unlock();
}
}
@Override
public MetricDescriptor getMetricDescriptor() {
return metricDescriptor;
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics.internal.state;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
/**
* Stores {@link MetricData} and allows synchronous writes of measurements.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public interface WriteableMetricStorage extends MetricStorage {
/** Bind an efficient storage handle for a set of attributes. */
BoundStorageHandle bind(Attributes attributes);
/** Records a measurement. */
default void recordLong(long value, Attributes attributes, Context context) {
BoundStorageHandle handle = bind(attributes);
try {
handle.recordLong(value, attributes, context);
} finally {
handle.release();
}
}
/** Records a measurement. */
default void recordDouble(double value, Attributes attributes, Context context) {
BoundStorageHandle handle = bind(attributes);
try {
handle.recordDouble(value, attributes, context);
} finally {
handle.release();
}
}
}

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics;
package io.opentelemetry.sdk.metrics.internal.view;
import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
@ -19,9 +19,12 @@ import javax.annotation.concurrent.Immutable;
/**
* Central location for Views to be registered. Registration of a view is done via the {@link
* SdkMeterProviderBuilder}.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
@Immutable
final class ViewRegistry {
public final class ViewRegistry {
static final View CUMULATIVE_SUM =
View.builder()
.setAggregatorFactory(AggregatorFactory.sum(AggregationTemporality.CUMULATIVE))
@ -41,11 +44,18 @@ final class ViewRegistry {
this.configuration.put(instrumentType, new LinkedHashMap<>(patternViewLinkedHashMap)));
}
static ViewRegistryBuilder builder() {
/** Returns a builder of {@link ViewRegistry}. */
public static ViewRegistryBuilder builder() {
return new ViewRegistryBuilder();
}
View findView(InstrumentDescriptor descriptor) {
/**
* Returns the metric {@link View} for a given instrument.
*
* @param descriptor description of the instrument.
* @return The {@link View} for this instrument, or a default aggregation view.
*/
public View findView(InstrumentDescriptor descriptor) {
LinkedHashMap<Pattern, View> configPerType = configuration.get(descriptor.getType());
for (Map.Entry<Pattern, View> entry : configPerType.entrySet()) {
if (entry.getKey().matcher(descriptor.getName()).matches()) {
@ -60,12 +70,12 @@ final class ViewRegistry {
switch (descriptor.getType()) {
case COUNTER:
case UP_DOWN_COUNTER:
case SUM_OBSERVER:
case UP_DOWN_SUM_OBSERVER:
case OBSERVABLE_SUM:
case OBSERVABLE_UP_DOWN_SUM:
return CUMULATIVE_SUM;
case VALUE_RECORDER:
case HISTOGRAM:
return SUMMARY;
case VALUE_OBSERVER:
case OBSERVABLE_GAUGE:
return LAST_VALUE;
}
throw new IllegalArgumentException("Unknown descriptor type: " + descriptor.getType());

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics;
package io.opentelemetry.sdk.metrics.internal.view;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.view.InstrumentSelector;
@ -12,7 +12,13 @@ import java.util.EnumMap;
import java.util.LinkedHashMap;
import java.util.regex.Pattern;
class ViewRegistryBuilder {
/**
* Builder for {@link ViewRegistry}.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public class ViewRegistryBuilder {
private final EnumMap<InstrumentType, LinkedHashMap<Pattern, View>> configuration =
new EnumMap<>(InstrumentType.class);
private static final LinkedHashMap<Pattern, View> EMPTY_CONFIG = new LinkedHashMap<>();
@ -23,11 +29,19 @@ class ViewRegistryBuilder {
}
}
ViewRegistry build() {
/** Returns the {@link ViewRegistry}. */
public ViewRegistry build() {
return new ViewRegistry(configuration);
}
ViewRegistryBuilder addView(InstrumentSelector selector, View view) {
/**
* Adds a new view to the registry.
*
* @param selector The instruments that should have their defaults altered.
* @param view The {@link View} metric definition.
* @return this
*/
public ViewRegistryBuilder addView(InstrumentSelector selector, View view) {
LinkedHashMap<Pattern, View> parentConfiguration =
configuration.get(selector.getInstrumentType());
configuration.put(

View File

@ -10,9 +10,6 @@ import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.data.MetricData;
import java.util.Collections;
import java.util.List;
import org.junit.jupiter.api.Test;
/** Unit tests for {@link AbstractInstrument}. */
@ -31,10 +28,5 @@ class AbstractInstrumentTest {
TestInstrument(InstrumentDescriptor descriptor) {
super(descriptor);
}
@Override
List<MetricData> collectAll(long epochNanos) {
return Collections.emptyList();
}
}
}

View File

@ -33,18 +33,18 @@ class DoubleCounterSdkTest {
private final Meter sdkMeter = sdkMeterProvider.get(getClass().getName());
@Test
void add_PreventNullLabels() {
void add_PreventNullAttributes() {
assertThatThrownBy(
() -> sdkMeter.counterBuilder("testCounter").ofDoubles().build().add(1.0, null))
.isInstanceOf(NullPointerException.class)
.hasMessage("labels");
.hasMessage("attributes");
}
@Test
void bound_PreventNullLabels() {
void bound_PreventNullAttributes() {
assertThatThrownBy(() -> sdkMeter.counterBuilder("testCounter").ofDoubles().build().bind(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("labels");
.hasMessage("attributes");
}
@Test
@ -60,7 +60,7 @@ class DoubleCounterSdkTest {
@Test
@SuppressWarnings("unchecked")
void collectMetrics_WithEmptyLabel() {
void collectMetrics_WithEmptyAttributes() {
DoubleCounter doubleCounter =
sdkMeter
.counterBuilder("testCounter")

View File

@ -17,11 +17,11 @@ import java.time.Duration;
import org.junit.jupiter.api.Test;
/** Unit tests for {@link DoubleValueObserverSdk}. */
class DoubleValueObserverSdkTest {
class DoubleGaugeBuilderSdkTest {
private static final Resource RESOURCE =
Resource.create(Attributes.of(stringKey("resource_key"), "resource_value"));
private static final InstrumentationLibraryInfo INSTRUMENTATION_LIBRARY_INFO =
InstrumentationLibraryInfo.create(DoubleValueObserverSdkTest.class.getName(), null);
InstrumentationLibraryInfo.create(DoubleGaugeBuilderSdkTest.class.getName(), null);
private final TestClock testClock = TestClock.create();
private final SdkMeterProvider sdkMeterProvider =
SdkMeterProvider.builder().setClock(testClock).setResource(RESOURCE).build();

View File

@ -28,30 +28,30 @@ import java.util.Collections;
import java.util.List;
import org.junit.jupiter.api.Test;
/** Unit tests for {@link DoubleValueRecorderSdk}. */
class DoubleValueRecorderSdkTest {
/** Unit tests for {@link DoubleHistogramSdk}. */
class DoubleHistogramSdkTest {
private static final long SECOND_NANOS = 1_000_000_000;
private static final Resource RESOURCE =
Resource.create(Attributes.of(stringKey("resource_key"), "resource_value"));
private static final InstrumentationLibraryInfo INSTRUMENTATION_LIBRARY_INFO =
InstrumentationLibraryInfo.create(DoubleValueRecorderSdkTest.class.getName(), null);
InstrumentationLibraryInfo.create(DoubleHistogramSdkTest.class.getName(), null);
private final TestClock testClock = TestClock.create();
private final SdkMeterProvider sdkMeterProvider =
SdkMeterProvider.builder().setClock(testClock).setResource(RESOURCE).build();
private final Meter sdkMeter = sdkMeterProvider.get(getClass().getName());
@Test
void record_PreventNullLabels() {
void record_PreventNullAttributes() {
assertThatThrownBy(() -> sdkMeter.histogramBuilder("testRecorder").build().record(1.0, null))
.isInstanceOf(NullPointerException.class)
.hasMessage("labels");
.hasMessage("attributes");
}
@Test
void bound_PreventNullLabels() {
void bound_PreventNullAttributes() {
assertThatThrownBy(() -> sdkMeter.histogramBuilder("testRecorder").build().bind(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("labels");
.hasMessage("attributes");
}
@Test
@ -67,7 +67,7 @@ class DoubleValueRecorderSdkTest {
}
@Test
void collectMetrics_WithEmptyLabel() {
void collectMetrics_WithEmptyAttributes() {
DoubleHistogram doubleRecorder =
sdkMeter
.histogramBuilder("testRecorder")
@ -188,7 +188,7 @@ class DoubleValueRecorderSdkTest {
StressTestRunner.Builder stressTestBuilder =
StressTestRunner.builder()
.setInstrument((DoubleValueRecorderSdk) doubleRecorder)
.setInstrument((DoubleHistogramSdk) doubleRecorder)
.setCollectionIntervalMs(100);
for (int i = 0; i < 4; i++) {
@ -196,7 +196,7 @@ class DoubleValueRecorderSdkTest {
StressTestRunner.Operation.create(
1_000,
2,
new DoubleValueRecorderSdkTest.OperationUpdaterDirectCall(doubleRecorder, "K", "V")));
new DoubleHistogramSdkTest.OperationUpdaterDirectCall(doubleRecorder, "K", "V")));
stressTestBuilder.addOperation(
StressTestRunner.Operation.create(
1_000,
@ -233,7 +233,7 @@ class DoubleValueRecorderSdkTest {
StressTestRunner.Builder stressTestBuilder =
StressTestRunner.builder()
.setInstrument((DoubleValueRecorderSdk) doubleRecorder)
.setInstrument((DoubleHistogramSdk) doubleRecorder)
.setCollectionIntervalMs(100);
for (int i = 0; i < 4; i++) {
@ -241,7 +241,7 @@ class DoubleValueRecorderSdkTest {
StressTestRunner.Operation.create(
2_000,
1,
new DoubleValueRecorderSdkTest.OperationUpdaterDirectCall(
new DoubleHistogramSdkTest.OperationUpdaterDirectCall(
doubleRecorder, keys[i], values[i])));
stressTestBuilder.addOperation(

View File

@ -109,7 +109,9 @@ class DoubleSumObserverSdkTest {
SdkMeterProvider sdkMeterProvider =
sdkMeterProviderBuilder
.registerView(
InstrumentSelector.builder().setInstrumentType(InstrumentType.SUM_OBSERVER).build(),
InstrumentSelector.builder()
.setInstrumentType(InstrumentType.OBSERVABLE_SUM)
.build(),
View.builder()
.setLabelsProcessorFactory(LabelsProcessorFactory.noop())
.setAggregatorFactory(AggregatorFactory.sum(AggregationTemporality.DELTA))

View File

@ -33,7 +33,7 @@ class DoubleUpDownCounterSdkTest {
private final Meter sdkMeter = sdkMeterProvider.get(getClass().getName());
@Test
void add_PreventNullLabels() {
void add_PreventNullAttributes() {
assertThatThrownBy(
() ->
sdkMeter
@ -42,15 +42,15 @@ class DoubleUpDownCounterSdkTest {
.build()
.add(1.0, null))
.isInstanceOf(NullPointerException.class)
.hasMessage("labels");
.hasMessage("attributes");
}
@Test
void bound_PreventNullLabels() {
void bound_PreventNullAttributes() {
assertThatThrownBy(
() -> sdkMeter.upDownCounterBuilder("testUpDownCounter").ofDoubles().build().bind(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("labels");
.hasMessage("attributes");
}
@Test
@ -68,7 +68,7 @@ class DoubleUpDownCounterSdkTest {
@Test
@SuppressWarnings("unchecked")
void collectMetrics_WithEmptyLabel() {
void collectMetrics_WithEmptyAttributes() {
DoubleUpDownCounter doubleUpDownCounter =
sdkMeter
.upDownCounterBuilder("testUpDownCounter")

View File

@ -106,7 +106,7 @@ class DoubleUpDownSumObserverSdkTest {
sdkMeterProviderBuilder
.registerView(
InstrumentSelector.builder()
.setInstrumentType(InstrumentType.UP_DOWN_SUM_OBSERVER)
.setInstrumentType(InstrumentType.OBSERVABLE_UP_DOWN_SUM)
.build(),
View.builder()
.setLabelsProcessorFactory(LabelsProcessorFactory.noop())

View File

@ -1,97 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.data.MetricData;
import java.util.Collections;
import java.util.List;
import org.junit.jupiter.api.Test;
/** Unit tests for {@link InstrumentRegistry}. */
class InstrumentRegistryTest {
private static final InstrumentDescriptor INSTRUMENT_DESCRIPTOR =
InstrumentDescriptor.create(
"name", "description", "1", InstrumentType.COUNTER, InstrumentValueType.LONG);
private static final InstrumentDescriptor OTHER_INSTRUMENT_DESCRIPTOR =
InstrumentDescriptor.create(
"name", "other_description", "1", InstrumentType.COUNTER, InstrumentValueType.LONG);
@Test
void register() {
MeterSharedState meterSharedState = MeterSharedState.create(InstrumentationLibraryInfo.empty());
TestInstrument testInstrument = new TestInstrument(INSTRUMENT_DESCRIPTOR);
assertThat(meterSharedState.getInstrumentRegistry().register(testInstrument))
.isSameAs(testInstrument);
assertThat(meterSharedState.getInstrumentRegistry().register(testInstrument))
.isSameAs(testInstrument);
assertThat(
meterSharedState
.getInstrumentRegistry()
.register(new TestInstrument(INSTRUMENT_DESCRIPTOR)))
.isSameAs(testInstrument);
}
@Test
void register_OtherDescriptor() {
MeterSharedState meterSharedState = MeterSharedState.create(InstrumentationLibraryInfo.empty());
TestInstrument testInstrument = new TestInstrument(INSTRUMENT_DESCRIPTOR);
assertThat(meterSharedState.getInstrumentRegistry().register(testInstrument))
.isSameAs(testInstrument);
assertThatThrownBy(
() ->
meterSharedState
.getInstrumentRegistry()
.register(new TestInstrument(OTHER_INSTRUMENT_DESCRIPTOR)))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Instrument with same name and different descriptor already created.");
}
@Test
void register_OtherInstance() {
MeterSharedState meterSharedState = MeterSharedState.create(InstrumentationLibraryInfo.empty());
TestInstrument testInstrument = new TestInstrument(INSTRUMENT_DESCRIPTOR);
assertThat(meterSharedState.getInstrumentRegistry().register(testInstrument))
.isSameAs(testInstrument);
assertThatThrownBy(
() ->
meterSharedState
.getInstrumentRegistry()
.register(new OtherTestInstrument(INSTRUMENT_DESCRIPTOR)))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Instrument with same name and different descriptor already created.");
}
private static final class TestInstrument extends AbstractInstrument {
TestInstrument(InstrumentDescriptor descriptor) {
super(descriptor);
}
@Override
List<MetricData> collectAll(long epochNanos) {
return Collections.emptyList();
}
}
private static final class OtherTestInstrument extends AbstractInstrument {
OtherTestInstrument(InstrumentDescriptor descriptor) {
super(descriptor);
}
@Override
List<MetricData> collectAll(long epochNanos) {
return Collections.emptyList();
}
}
}

View File

@ -33,17 +33,17 @@ class LongCounterSdkTest {
private final Meter sdkMeter = sdkMeterProvider.get(getClass().getName());
@Test
void add_PreventNullLabels() {
void add_PreventNullAttributes() {
assertThatThrownBy(() -> sdkMeter.counterBuilder("testCounter").build().add(1, null))
.isInstanceOf(NullPointerException.class)
.hasMessage("labels");
.hasMessage("attributes");
}
@Test
void bound_PreventNullLabels() {
void bound_PreventNullAttributes() {
assertThatThrownBy(() -> sdkMeter.counterBuilder("testCounter").build().bind(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("labels");
.hasMessage("attributes");
}
@Test
@ -59,7 +59,7 @@ class LongCounterSdkTest {
@Test
@SuppressWarnings("unchecked")
void collectMetrics_WithEmptyLabels() {
void collectMetrics_WithEmptyAttributes() {
LongCounter longCounter =
sdkMeter.counterBuilder("testCounter").setDescription("description").setUnit("By").build();
testClock.advance(Duration.ofNanos(SECOND_NANOS));

View File

@ -17,11 +17,11 @@ import java.time.Duration;
import org.junit.jupiter.api.Test;
/** Unit tests for {@link LongValueObserverSdk}. */
class LongValueObserverSdkTest {
class LongGaugeBuilderSdkTest {
private static final Resource RESOURCE =
Resource.create(Attributes.of(stringKey("resource_key"), "resource_value"));
private static final InstrumentationLibraryInfo INSTRUMENTATION_LIBRARY_INFO =
InstrumentationLibraryInfo.create(LongValueObserverSdkTest.class.getName(), null);
InstrumentationLibraryInfo.create(LongGaugeBuilderSdkTest.class.getName(), null);
private final TestClock testClock = TestClock.create();
private final SdkMeterProvider sdkMeterProvider =
SdkMeterProvider.builder().setClock(testClock).setResource(RESOURCE).build();

View File

@ -28,31 +28,31 @@ import java.util.Collections;
import java.util.List;
import org.junit.jupiter.api.Test;
/** Unit tests for {@link LongValueRecorderSdk}. */
class LongValueRecorderSdkTest {
/** Unit tests for {@link LongHistogramSdk}. */
class LongHistogramSdkTest {
private static final long SECOND_NANOS = 1_000_000_000;
private static final Resource RESOURCE =
Resource.create(Attributes.of(stringKey("resource_key"), "resource_value"));
private static final InstrumentationLibraryInfo INSTRUMENTATION_LIBRARY_INFO =
InstrumentationLibraryInfo.create(LongValueRecorderSdkTest.class.getName(), null);
InstrumentationLibraryInfo.create(LongHistogramSdkTest.class.getName(), null);
private final TestClock testClock = TestClock.create();
private final SdkMeterProvider sdkMeterProvider =
SdkMeterProvider.builder().setClock(testClock).setResource(RESOURCE).build();
private final Meter sdkMeter = sdkMeterProvider.get(getClass().getName());
@Test
void record_PreventNullLabels() {
void record_PreventNullAttributes() {
assertThatThrownBy(
() -> sdkMeter.histogramBuilder("testRecorder").ofLongs().build().record(1, null))
.isInstanceOf(NullPointerException.class)
.hasMessage("labels");
.hasMessage("attributes");
}
@Test
void bound_PreventNullLabels() {
void bound_PreventNullAttributes() {
assertThatThrownBy(() -> sdkMeter.histogramBuilder("testRecorder").ofLongs().build().bind(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("labels");
.hasMessage("attributes");
}
@Test
@ -67,7 +67,7 @@ class LongValueRecorderSdkTest {
}
@Test
void collectMetrics_WithEmptyLabel() {
void collectMetrics_WithEmptyAttributes() {
LongHistogram longRecorder =
sdkMeter
.histogramBuilder("testRecorder")
@ -188,7 +188,7 @@ class LongValueRecorderSdkTest {
StressTestRunner.Builder stressTestBuilder =
StressTestRunner.builder()
.setInstrument((LongValueRecorderSdk) longRecorder)
.setInstrument((LongHistogramSdk) longRecorder)
.setCollectionIntervalMs(100);
for (int i = 0; i < 4; i++) {
@ -196,12 +196,12 @@ class LongValueRecorderSdkTest {
StressTestRunner.Operation.create(
2_000,
1,
new LongValueRecorderSdkTest.OperationUpdaterDirectCall(longRecorder, "K", "V")));
new LongHistogramSdkTest.OperationUpdaterDirectCall(longRecorder, "K", "V")));
stressTestBuilder.addOperation(
StressTestRunner.Operation.create(
2_000,
1,
new LongValueRecorderSdkTest.OperationUpdaterWithBinding(
new LongHistogramSdkTest.OperationUpdaterWithBinding(
longRecorder.bind(Attributes.builder().put("K", "V").build()))));
}
@ -233,7 +233,7 @@ class LongValueRecorderSdkTest {
StressTestRunner.Builder stressTestBuilder =
StressTestRunner.builder()
.setInstrument((LongValueRecorderSdk) longRecorder)
.setInstrument((LongHistogramSdk) longRecorder)
.setCollectionIntervalMs(100);
for (int i = 0; i < 4; i++) {
@ -241,14 +241,14 @@ class LongValueRecorderSdkTest {
StressTestRunner.Operation.create(
1_000,
2,
new LongValueRecorderSdkTest.OperationUpdaterDirectCall(
new LongHistogramSdkTest.OperationUpdaterDirectCall(
longRecorder, keys[i], values[i])));
stressTestBuilder.addOperation(
StressTestRunner.Operation.create(
1_000,
2,
new LongValueRecorderSdkTest.OperationUpdaterWithBinding(
new LongHistogramSdkTest.OperationUpdaterWithBinding(
longRecorder.bind(Attributes.builder().put(keys[i], values[i]).build()))));
}

View File

@ -103,7 +103,9 @@ class LongSumObserverSdkTest {
SdkMeterProvider sdkMeterProvider =
sdkMeterProviderBuilder
.registerView(
InstrumentSelector.builder().setInstrumentType(InstrumentType.SUM_OBSERVER).build(),
InstrumentSelector.builder()
.setInstrumentType(InstrumentType.OBSERVABLE_SUM)
.build(),
View.builder()
.setLabelsProcessorFactory(LabelsProcessorFactory.noop())
.setAggregatorFactory(AggregatorFactory.sum(AggregationTemporality.DELTA))

View File

@ -33,17 +33,17 @@ class LongUpDownCounterSdkTest {
private final Meter sdkMeter = sdkMeterProvider.get(getClass().getName());
@Test
void add_PreventNullLabels() {
void add_PreventNullAttributes() {
assertThatThrownBy(() -> sdkMeter.upDownCounterBuilder("testCounter").build().add(1, null))
.isInstanceOf(NullPointerException.class)
.hasMessage("labels");
.hasMessage("attributes");
}
@Test
void bound_PreventNullLabels() {
void bound_PreventNullAttributes() {
assertThatThrownBy(() -> sdkMeter.upDownCounterBuilder("testUpDownCounter").build().bind(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("labels");
.hasMessage("attributes");
}
@Test
@ -61,7 +61,7 @@ class LongUpDownCounterSdkTest {
@Test
@SuppressWarnings("unchecked")
void collectMetrics_WithEmptyLabel() {
void collectMetrics_WithEmptyAttributes() {
LongUpDownCounter longUpDownCounter =
sdkMeter
.upDownCounterBuilder("testUpDownCounter")

View File

@ -104,7 +104,7 @@ class LongUpDownSumObserverSdkTest {
sdkMeterProviderBuilder
.registerView(
InstrumentSelector.builder()
.setInstrumentType(InstrumentType.UP_DOWN_SUM_OBSERVER)
.setInstrumentType(InstrumentType.OBSERVABLE_UP_DOWN_SUM)
.build(),
View.builder()
.setLabelsProcessorFactory(LabelsProcessorFactory.noop())

View File

@ -31,20 +31,21 @@ class SdkMeterTest {
.build();
assertThat(longCounter).isNotNull();
assertThat(
sdkMeter
.counterBuilder("testLongCounter")
.setDescription("My very own counter")
.setUnit("metric tonnes")
.build())
.isSameAs(longCounter);
// Note: We no longer get the same instrument instance as these instances are lightweight
// objects backed by storage now. Here we just make sure it doesn't throw to grab
// a second instance.
sdkMeter
.counterBuilder("testLongCounter")
.setDescription("My very own counter")
.setUnit("metric tonnes")
.build();
assertThatThrownBy(() -> sdkMeter.counterBuilder("testLongCounter").build())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Instrument with same name and different descriptor already created.");
.hasMessageContaining("Metric with same name and different descriptor already created.");
assertThatThrownBy(() -> sdkMeter.counterBuilder("testLongCounter".toUpperCase()).build())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Instrument with same name and different descriptor already created.");
.hasMessageContaining("Metric with same name and different descriptor already created.");
}
@Test
@ -57,21 +58,22 @@ class SdkMeterTest {
.build();
assertThat(longUpDownCounter).isNotNull();
assertThat(
sdkMeter
.upDownCounterBuilder("testLongUpDownCounter")
.setDescription("My very own counter")
.setUnit("metric tonnes")
.build())
.isSameAs(longUpDownCounter);
// Note: We no longer get the same instrument instance as these instances are lightweight
// objects backed by storage now. Here we just make sure it doesn't throw to grab
// a second instance.
sdkMeter
.upDownCounterBuilder("testLongUpDownCounter")
.setDescription("My very own counter")
.setUnit("metric tonnes")
.build();
assertThatThrownBy(() -> sdkMeter.upDownCounterBuilder("testLongUpDownCounter").build())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Instrument with same name and different descriptor already created.");
.hasMessageContaining("Metric with same name and different descriptor already created.");
assertThatThrownBy(
() -> sdkMeter.upDownCounterBuilder("testLongUpDownCounter".toUpperCase()).build())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Instrument with same name and different descriptor already created.");
.hasMessageContaining("Metric with same name and different descriptor already created.");
}
@Test
@ -85,23 +87,24 @@ class SdkMeterTest {
.build();
assertThat(longValueRecorder).isNotNull();
assertThat(
sdkMeter
.histogramBuilder("testLongValueRecorder")
.ofLongs()
.setDescription("My very own counter")
.setUnit("metric tonnes")
.build())
.isSameAs(longValueRecorder);
// Note: We no longer get the same instrument instance as these instances are lightweight
// objects backed by storage now. Here we just make sure it doesn't throw to grab
// a second instance.
sdkMeter
.histogramBuilder("testLongValueRecorder")
.ofLongs()
.setDescription("My very own counter")
.setUnit("metric tonnes")
.build();
assertThatThrownBy(() -> sdkMeter.histogramBuilder("testLongValueRecorder").ofLongs().build())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Instrument with same name and different descriptor already created.");
.hasMessageContaining("Metric with same name and different descriptor already created.");
assertThatThrownBy(
() ->
sdkMeter.histogramBuilder("testLongValueRecorder".toUpperCase()).ofLongs().build())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Instrument with same name and different descriptor already created.");
.hasMessageContaining("Metric with same name and different descriptor already created.");
}
@Test
@ -116,7 +119,7 @@ class SdkMeterTest {
assertThatThrownBy(
() -> sdkMeter.gaugeBuilder("longValueObserver").ofLongs().buildWithCallback(x -> {}))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Instrument with same name and different descriptor already created.");
.hasMessageContaining("Metric with same name and different descriptor already created.");
assertThatThrownBy(
() ->
sdkMeter
@ -124,7 +127,7 @@ class SdkMeterTest {
.ofLongs()
.buildWithCallback(x -> {}))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Instrument with same name and different descriptor already created.");
.hasMessageContaining("Metric with same name and different descriptor already created.");
}
@Test
@ -138,7 +141,7 @@ class SdkMeterTest {
assertThatThrownBy(
() -> sdkMeter.counterBuilder("testLongSumObserver").buildWithCallback(x -> {}))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Instrument with same name and different descriptor already created.");
.hasMessageContaining("Metric with same name and different descriptor already created.");
assertThatThrownBy(
() ->
@ -146,7 +149,7 @@ class SdkMeterTest {
.counterBuilder("testLongSumObserver".toUpperCase())
.buildWithCallback(x -> {}))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Instrument with same name and different descriptor already created.");
.hasMessageContaining("Metric with same name and different descriptor already created.");
}
@Test
@ -163,7 +166,7 @@ class SdkMeterTest {
.upDownCounterBuilder("testLongUpDownSumObserver")
.buildWithCallback(x -> {}))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Instrument with same name and different descriptor already created.");
.hasMessageContaining("Metric with same name and different descriptor already created.");
assertThatThrownBy(
() ->
@ -171,7 +174,7 @@ class SdkMeterTest {
.upDownCounterBuilder("testLongUpDownSumObserver".toUpperCase())
.buildWithCallback(x -> {}))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Instrument with same name and different descriptor already created.");
.hasMessageContaining("Metric with same name and different descriptor already created.");
}
@Test
@ -185,22 +188,23 @@ class SdkMeterTest {
.build();
assertThat(doubleCounter).isNotNull();
assertThat(
sdkMeter
.counterBuilder("testDoubleCounter")
.ofDoubles()
.setDescription("My very own counter")
.setUnit("metric tonnes")
.build())
.isSameAs(doubleCounter);
// Note: We no longer get the same instrument instance as these instances are lightweight
// objects backed by storage now. Here we just make sure it doesn't throw to grab
// a second instance.
sdkMeter
.counterBuilder("testDoubleCounter")
.ofDoubles()
.setDescription("My very own counter")
.setUnit("metric tonnes")
.build();
assertThatThrownBy(() -> sdkMeter.counterBuilder("testDoubleCounter").ofDoubles().build())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Instrument with same name and different descriptor already created.");
.hasMessageContaining("Metric with same name and different descriptor already created.");
assertThatThrownBy(
() -> sdkMeter.counterBuilder("testDoubleCounter".toUpperCase()).ofDoubles().build())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Instrument with same name and different descriptor already created.");
.hasMessageContaining("Metric with same name and different descriptor already created.");
}
@Test
@ -214,19 +218,20 @@ class SdkMeterTest {
.build();
assertThat(doubleUpDownCounter).isNotNull();
assertThat(
sdkMeter
.upDownCounterBuilder("testDoubleUpDownCounter")
.ofDoubles()
.setDescription("My very own counter")
.setUnit("metric tonnes")
.build())
.isSameAs(doubleUpDownCounter);
// Note: We no longer get the same instrument instance as these instances are lightweight
// objects backed by storage now. Here we just make sure it doesn't throw to grab
// a second instance.
sdkMeter
.upDownCounterBuilder("testDoubleUpDownCounter")
.ofDoubles()
.setDescription("My very own counter")
.setUnit("metric tonnes")
.build();
assertThatThrownBy(
() -> sdkMeter.upDownCounterBuilder("testDoubleUpDownCounter").ofDoubles().build())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Instrument with same name and different descriptor already created.");
.hasMessageContaining("Metric with same name and different descriptor already created.");
assertThatThrownBy(
() ->
sdkMeter
@ -234,7 +239,7 @@ class SdkMeterTest {
.ofDoubles()
.build())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Instrument with same name and different descriptor already created.");
.hasMessageContaining("Metric with same name and different descriptor already created.");
}
@Test
@ -247,21 +252,22 @@ class SdkMeterTest {
.build();
assertThat(doubleValueRecorder).isNotNull();
assertThat(
sdkMeter
.histogramBuilder("testDoubleValueRecorder")
.setDescription("My very own ValueRecorder")
.setUnit("metric tonnes")
.build())
.isSameAs(doubleValueRecorder);
// Note: We no longer get the same instrument instance as these instances are lightweight
// objects backed by storage now. Here we just make sure it doesn't throw to grab
// a second instance.
sdkMeter
.histogramBuilder("testDoubleValueRecorder")
.setDescription("My very own ValueRecorder")
.setUnit("metric tonnes")
.build();
assertThatThrownBy(() -> sdkMeter.histogramBuilder("testDoubleValueRecorder").build())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Instrument with same name and different descriptor already created.");
.hasMessageContaining("Metric with same name and different descriptor already created.");
assertThatThrownBy(
() -> sdkMeter.histogramBuilder("testDoubleValueRecorder".toUpperCase()).build())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Instrument with same name and different descriptor already created.");
.hasMessageContaining("Metric with same name and different descriptor already created.");
}
@Test
@ -280,7 +286,7 @@ class SdkMeterTest {
.ofDoubles()
.buildWithCallback(x -> {}))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Instrument with same name and different descriptor already created.");
.hasMessageContaining("Metric with same name and different descriptor already created.");
assertThatThrownBy(
() ->
sdkMeter
@ -288,7 +294,7 @@ class SdkMeterTest {
.ofDoubles()
.buildWithCallback(x -> {}))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Instrument with same name and different descriptor already created.");
.hasMessageContaining("Metric with same name and different descriptor already created.");
}
@Test
@ -307,7 +313,7 @@ class SdkMeterTest {
.ofDoubles()
.buildWithCallback(x -> {}))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Instrument with same name and different descriptor already created.");
.hasMessageContaining("Metric with same name and different descriptor already created.");
assertThatThrownBy(
() ->
sdkMeter
@ -315,7 +321,7 @@ class SdkMeterTest {
.ofDoubles()
.buildWithCallback(x -> {}))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Instrument with same name and different descriptor already created.");
.hasMessageContaining("Metric with same name and different descriptor already created.");
}
@Test
@ -329,13 +335,13 @@ class SdkMeterTest {
assertThatThrownBy(
() -> sdkMeter.gaugeBuilder("doubleValueObserver").buildWithCallback(x -> {}))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Instrument with same name and different descriptor already created.");
.hasMessageContaining("Metric with same name and different descriptor already created.");
assertThatThrownBy(
() ->
sdkMeter
.gaugeBuilder("doubleValueObserver".toUpperCase())
.buildWithCallback(x -> {}))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Instrument with same name and different descriptor already created.");
.hasMessageContaining("Metric with same name and different descriptor already created.");
}
}

View File

@ -139,7 +139,7 @@ class AggregatorFactoryTest {
"name",
"description",
"unit",
InstrumentType.VALUE_RECORDER,
InstrumentType.HISTOGRAM,
InstrumentValueType.LONG)))
.isInstanceOf(DoubleHistogramAggregator.class);
assertThat(
@ -150,7 +150,7 @@ class AggregatorFactoryTest {
"name",
"description",
"unit",
InstrumentType.VALUE_RECORDER,
InstrumentType.HISTOGRAM,
InstrumentValueType.DOUBLE)))
.isInstanceOf(DoubleHistogramAggregator.class);
@ -163,7 +163,7 @@ class AggregatorFactoryTest {
"name",
"description",
"unit",
InstrumentType.VALUE_RECORDER,
InstrumentType.HISTOGRAM,
InstrumentValueType.LONG))
.isStateful())
.isFalse();
@ -177,7 +177,7 @@ class AggregatorFactoryTest {
"name",
"description",
"unit",
InstrumentType.VALUE_RECORDER,
InstrumentType.HISTOGRAM,
InstrumentValueType.DOUBLE))
.isStateful())
.isTrue();

View File

@ -26,22 +26,14 @@ class CountAggregatorTest {
Resource.getDefault(),
InstrumentationLibraryInfo.empty(),
InstrumentDescriptor.create(
"name",
"description",
"unit",
InstrumentType.VALUE_RECORDER,
InstrumentValueType.LONG),
"name", "description", "unit", InstrumentType.HISTOGRAM, InstrumentValueType.LONG),
AggregationTemporality.CUMULATIVE);
private static final CountAggregator deltaAggregator =
new CountAggregator(
Resource.getDefault(),
InstrumentationLibraryInfo.empty(),
InstrumentDescriptor.create(
"name",
"description",
"unit",
InstrumentType.VALUE_RECORDER,
InstrumentValueType.LONG),
"name", "description", "unit", InstrumentType.HISTOGRAM, InstrumentValueType.LONG),
AggregationTemporality.DELTA);
@Test

View File

@ -32,11 +32,7 @@ public class DoubleHistogramAggregatorTest {
Resource.getDefault(),
InstrumentationLibraryInfo.empty(),
InstrumentDescriptor.create(
"name",
"description",
"unit",
InstrumentType.VALUE_RECORDER,
InstrumentValueType.LONG),
"name", "description", "unit", InstrumentType.HISTOGRAM, InstrumentValueType.LONG),
boundaries,
/* stateful= */ false);

View File

@ -28,7 +28,7 @@ class DoubleLastValueAggregatorTest {
"name",
"description",
"unit",
InstrumentType.VALUE_OBSERVER,
InstrumentType.OBSERVABLE_GAUGE,
InstrumentValueType.DOUBLE));
@Test

View File

@ -31,11 +31,7 @@ class DoubleMinMaxSumCountAggregatorTest {
Resource.getDefault(),
InstrumentationLibraryInfo.empty(),
InstrumentDescriptor.create(
"name",
"description",
"unit",
InstrumentType.VALUE_RECORDER,
InstrumentValueType.DOUBLE));
"name", "description", "unit", InstrumentType.HISTOGRAM, InstrumentValueType.DOUBLE));
@Test
void createHandle() {

View File

@ -29,7 +29,7 @@ class LongLastValueAggregatorTest {
"name",
"description",
"unit",
InstrumentType.VALUE_OBSERVER,
InstrumentType.OBSERVABLE_GAUGE,
InstrumentValueType.LONG));
@Test

View File

@ -31,11 +31,7 @@ class LongMinMaxSumCountAggregatorTest {
Resource.getDefault(),
InstrumentationLibraryInfo.empty(),
InstrumentDescriptor.create(
"name",
"description",
"unit",
InstrumentType.VALUE_RECORDER,
InstrumentValueType.LONG));
"name", "description", "unit", InstrumentType.HISTOGRAM, InstrumentValueType.LONG));
@Test
void createHandle() {

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics;
package io.opentelemetry.sdk.metrics.internal.state;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
@ -12,6 +12,7 @@ import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.internal.view.ViewRegistry;
import io.opentelemetry.sdk.metrics.processor.LabelsProcessor;
import io.opentelemetry.sdk.metrics.view.InstrumentSelector;
import io.opentelemetry.sdk.metrics.view.View;
@ -21,7 +22,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
public class AsynchronousInstrumentAccumulatorTest {
public class AsynchronousMetricStorageTest {
private final TestClock testClock = TestClock.create();
private MeterProviderSharedState meterProviderSharedState;
private final MeterSharedState meterSharedState =
@ -43,7 +44,7 @@ public class AsynchronousInstrumentAccumulatorTest {
ViewRegistry.builder()
.addView(
InstrumentSelector.builder()
.setInstrumentType(InstrumentType.VALUE_OBSERVER)
.setInstrumentType(InstrumentType.OBSERVABLE_GAUGE)
.build(),
View.builder()
.setAggregatorFactory(AggregatorFactory.lastValue())
@ -58,33 +59,33 @@ public class AsynchronousInstrumentAccumulatorTest {
@Test
void doubleAsynchronousAccumulator_LabelsProcessor_used() {
AsynchronousInstrumentAccumulator.doubleAsynchronousAccumulator(
AsynchronousMetricStorage.doubleAsynchronousAccumulator(
meterProviderSharedState,
meterSharedState,
InstrumentDescriptor.create(
"name",
"description",
"unit",
InstrumentType.VALUE_OBSERVER,
InstrumentType.OBSERVABLE_GAUGE,
InstrumentValueType.DOUBLE),
value -> value.observe(1.0, Attributes.empty()))
.collectAll(testClock.nanoTime());
.collectAndReset(0, testClock.now());
Mockito.verify(spyLabelProcessor).onLabelsBound(Context.current(), Attributes.empty());
}
@Test
void longAsynchronousAccumulator_LabelsProcessor_used() {
AsynchronousInstrumentAccumulator.longAsynchronousAccumulator(
AsynchronousMetricStorage.longAsynchronousAccumulator(
meterProviderSharedState,
meterSharedState,
InstrumentDescriptor.create(
"name",
"description",
"unit",
InstrumentType.VALUE_OBSERVER,
InstrumentType.OBSERVABLE_GAUGE,
InstrumentValueType.LONG),
value -> value.observe(1, Attributes.empty()))
.collectAll(testClock.nanoTime());
.collectAndReset(0, testClock.nanoTime());
Mockito.verify(spyLabelProcessor).onLabelsBound(Context.current(), Attributes.empty());
}
}

View File

@ -0,0 +1,116 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics.internal.state;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import org.junit.jupiter.api.Test;
/** Unit tests for {@link MetricStorageRegistry}. */
class MetricStorageRegistryTest {
private static final MetricDescriptor METRIC_DESCRIPTOR =
MetricDescriptor.create("name", "description", "1");
private static final MetricDescriptor OTHER_METRIC_DESCRIPTOR =
MetricDescriptor.create("name", "other_description", "1");
@Test
void register() {
MeterSharedState meterSharedState = MeterSharedState.create(InstrumentationLibraryInfo.empty());
TestMetricStorage testInstrument = new TestMetricStorage(METRIC_DESCRIPTOR);
assertThat(meterSharedState.getMetricStorageRegistry().register(testInstrument))
.isSameAs(testInstrument);
assertThat(meterSharedState.getMetricStorageRegistry().register(testInstrument))
.isSameAs(testInstrument);
assertThat(
meterSharedState
.getMetricStorageRegistry()
.register(new TestMetricStorage(METRIC_DESCRIPTOR)))
.isSameAs(testInstrument);
}
@Test
void register_OtherDescriptor() {
MeterSharedState meterSharedState = MeterSharedState.create(InstrumentationLibraryInfo.empty());
TestMetricStorage testInstrument = new TestMetricStorage(METRIC_DESCRIPTOR);
assertThat(meterSharedState.getMetricStorageRegistry().register(testInstrument))
.isSameAs(testInstrument);
assertThatThrownBy(
() ->
meterSharedState
.getMetricStorageRegistry()
.register(new TestMetricStorage(OTHER_METRIC_DESCRIPTOR)))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Metric with same name and different descriptor already created.");
}
@Test
void register_OtherInstance() {
MeterSharedState meterSharedState = MeterSharedState.create(InstrumentationLibraryInfo.empty());
TestMetricStorage testInstrument = new TestMetricStorage(METRIC_DESCRIPTOR);
assertThat(meterSharedState.getMetricStorageRegistry().register(testInstrument))
.isSameAs(testInstrument);
assertThatThrownBy(
() ->
meterSharedState
.getMetricStorageRegistry()
.register(new OtherTestMetricStorage(METRIC_DESCRIPTOR)))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Metric with same name and different instrument already created.");
}
private static final class TestMetricStorage implements WriteableMetricStorage {
private final MetricDescriptor descriptor;
TestMetricStorage(MetricDescriptor descriptor) {
this.descriptor = descriptor;
}
@Override
public MetricDescriptor getMetricDescriptor() {
return descriptor;
}
@Override
public MetricData collectAndReset(long startEpochNanos, long epochNanos) {
return null;
}
@Override
public BoundStorageHandle bind(Attributes attributes) {
return null;
}
}
private static final class OtherTestMetricStorage implements WriteableMetricStorage {
private final MetricDescriptor descriptor;
OtherTestMetricStorage(MetricDescriptor descriptor) {
this.descriptor = descriptor;
}
@Override
public MetricDescriptor getMetricDescriptor() {
return descriptor;
}
@Override
public MetricData collectAndReset(long startEpochNanos, long epochNanos) {
return null;
}
@Override
public BoundStorageHandle bind(Attributes attributes) {
return null;
}
}
}

View File

@ -3,9 +3,10 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics;
package io.opentelemetry.sdk.metrics.internal.state;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static io.opentelemetry.sdk.testing.assertj.metrics.MetricAssertions.assertThat;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.common.Attributes;
@ -13,23 +14,24 @@ import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.aggregator.AggregatorHandle;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.processor.LabelsProcessor;
import io.opentelemetry.sdk.metrics.processor.LabelsProcessorFactory;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.time.TestClock;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
public class SynchronousInstrumentAccumulatorTest {
public class SynchronousMetricStorageTest {
private static final InstrumentDescriptor DESCRIPTOR =
InstrumentDescriptor.create(
"name", "description", "unit", InstrumentType.COUNTER, InstrumentValueType.DOUBLE);
private static final MetricDescriptor METRIC_DESCRIPTOR =
MetricDescriptor.create("name", "description", "unit");
private final TestClock testClock = TestClock.create();
private final Aggregator<Long> aggregator =
AggregatorFactory.lastValue()
@ -41,9 +43,12 @@ public class SynchronousInstrumentAccumulatorTest {
@Test
void labelsProcessor_used() {
LabelsProcessor spyLabelsProcessor = Mockito.spy(this.labelsProcessor);
SynchronousInstrumentAccumulator<?> accumulator =
new SynchronousInstrumentAccumulator<>(
aggregator, new InstrumentProcessor<>(aggregator, testClock.now()), spyLabelsProcessor);
SynchronousMetricStorage<?> accumulator =
new SynchronousMetricStorage<>(
METRIC_DESCRIPTOR,
aggregator,
new InstrumentProcessor<>(aggregator, testClock.now()),
spyLabelsProcessor);
accumulator.bind(Attributes.empty());
Mockito.verify(spyLabelsProcessor).onLabelsBound(Context.current(), Attributes.empty());
}
@ -59,44 +64,55 @@ public class SynchronousInstrumentAccumulatorTest {
}
};
LabelsProcessor spyLabelsProcessor = Mockito.spy(labelsProcessor);
SynchronousInstrumentAccumulator<?> accumulator =
new SynchronousInstrumentAccumulator<>(
aggregator, new InstrumentProcessor<>(aggregator, testClock.now()), spyLabelsProcessor);
AggregatorHandle<?> aggregatorHandle = accumulator.bind(labels);
aggregatorHandle.recordDouble(1);
List<MetricData> md = accumulator.collectAll(testClock.now());
md.stream()
.flatMap(m -> m.getLongGaugeData().getPoints().stream())
.forEach(
p -> assertThat(p.getAttributes()).hasSize(1).containsEntry("modifiedK", "modifiedV"));
SynchronousMetricStorage<?> accumulator =
new SynchronousMetricStorage<>(
METRIC_DESCRIPTOR,
aggregator,
new InstrumentProcessor<>(aggregator, testClock.now()),
spyLabelsProcessor);
BoundStorageHandle handle = accumulator.bind(labels);
handle.recordDouble(1, labels, Context.root());
MetricData md = accumulator.collectAndReset(0, testClock.now());
assertThat(md)
.hasDoubleGauge()
.points()
.allSatisfy(
p ->
assertThat(p)
.attributes()
.hasSize(2)
.containsEntry("modifiedK", "modifiedV")
.containsEntry("K", "V"));
}
@Test
void sameAggregator_ForSameLabelSet() {
SynchronousInstrumentAccumulator<?> accumulator =
new SynchronousInstrumentAccumulator<>(
aggregator, new InstrumentProcessor<>(aggregator, testClock.now()), labelsProcessor);
AggregatorHandle<?> aggregatorHandle =
accumulator.bind(Attributes.builder().put("K", "V").build());
AggregatorHandle<?> duplicateAggregatorHandle =
void sameAggregator_ForSameAttributes() {
SynchronousMetricStorage<?> accumulator =
new SynchronousMetricStorage<>(
METRIC_DESCRIPTOR,
aggregator,
new InstrumentProcessor<>(aggregator, testClock.now()),
labelsProcessor);
BoundStorageHandle handle = accumulator.bind(Attributes.builder().put("K", "V").build());
BoundStorageHandle duplicateHandle =
accumulator.bind(Attributes.builder().put("K", "V").build());
try {
assertThat(duplicateAggregatorHandle).isSameAs(aggregatorHandle);
accumulator.collectAll(testClock.now());
AggregatorHandle<?> anotherDuplicateAggregatorHandle =
assertThat(duplicateHandle).isSameAs(handle);
accumulator.collectAndReset(0, testClock.now());
BoundStorageHandle anotherDuplicateAggregatorHandle =
accumulator.bind(Attributes.builder().put("K", "V").build());
try {
assertThat(anotherDuplicateAggregatorHandle).isSameAs(aggregatorHandle);
assertThat(anotherDuplicateAggregatorHandle).isSameAs(handle);
} finally {
anotherDuplicateAggregatorHandle.release();
}
} finally {
duplicateAggregatorHandle.release();
aggregatorHandle.release();
duplicateHandle.release();
handle.release();
}
// At this point we should be able to unmap because all references are gone. Because this is an
// internal detail we cannot call collectAll after this anymore.
assertThat(aggregatorHandle.tryUnmap()).isTrue();
// If we try to collect once all bound references are gone AND no recordings have occurred, we
// should not see any labels (or metric).
assertThat(accumulator.collectAndReset(0, testClock.now())).isNull();
}
}

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics;
package io.opentelemetry.sdk.metrics.internal.view;
import static org.assertj.core.api.Assertions.assertThat;
@ -154,22 +154,22 @@ class ViewRegistryTest {
assertThat(
viewRegistry.findView(
InstrumentDescriptor.create(
"", "", "", InstrumentType.VALUE_RECORDER, InstrumentValueType.LONG)))
"", "", "", InstrumentType.HISTOGRAM, InstrumentValueType.LONG)))
.isSameAs(ViewRegistry.SUMMARY);
assertThat(
viewRegistry.findView(
InstrumentDescriptor.create(
"", "", "", InstrumentType.SUM_OBSERVER, InstrumentValueType.LONG)))
"", "", "", InstrumentType.OBSERVABLE_SUM, InstrumentValueType.LONG)))
.isSameAs(ViewRegistry.CUMULATIVE_SUM);
assertThat(
viewRegistry.findView(
InstrumentDescriptor.create(
"", "", "", InstrumentType.VALUE_OBSERVER, InstrumentValueType.LONG)))
"", "", "", InstrumentType.OBSERVABLE_GAUGE, InstrumentValueType.LONG)))
.isSameAs(ViewRegistry.LAST_VALUE);
assertThat(
viewRegistry.findView(
InstrumentDescriptor.create(
"", "", "", InstrumentType.UP_DOWN_SUM_OBSERVER, InstrumentValueType.LONG)))
"", "", "", InstrumentType.OBSERVABLE_UP_DOWN_SUM, InstrumentValueType.LONG)))
.isSameAs(ViewRegistry.CUMULATIVE_SUM);
}
}