Delete notion of accumulation (#5154)

This commit is contained in:
jack-berg 2023-02-03 10:56:17 -06:00 committed by GitHub
parent f43905a535
commit 9cfdf67a49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 886 additions and 1036 deletions

View File

@ -5,14 +5,25 @@
package io.opentelemetry.sdk.metrics.internal.aggregator;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
abstract class AbstractSumAggregator<T, U extends ExemplarData> implements Aggregator<T, U> {
abstract class AbstractSumAggregator<T extends PointData, U extends ExemplarData>
implements Aggregator<T, U> {
private final boolean isMonotonic;
AbstractSumAggregator(InstrumentDescriptor instrumentDescriptor) {
this.isMonotonic = MetricDataUtils.isMonotonicInstrument(instrumentDescriptor);
this.isMonotonic = isMonotonicInstrument(instrumentDescriptor);
}
/** Returns true if the instrument does not allow negative measurements. */
private static boolean isMonotonicInstrument(InstrumentDescriptor descriptor) {
InstrumentType type = descriptor.getType();
return type == InstrumentType.HISTOGRAM
|| type == InstrumentType.COUNTER
|| type == InstrumentType.OBSERVABLE_COUNTER;
}
final boolean isMonotonic() {

View File

@ -5,36 +5,30 @@
package io.opentelemetry.sdk.metrics.internal.aggregator;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.MetricDataType;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.internal.state.Measurement;
import io.opentelemetry.sdk.resources.Resource;
import java.util.Map;
import javax.annotation.Nullable;
import java.util.Collection;
import javax.annotation.concurrent.Immutable;
/**
* Aggregator represents the abstract class for all the available aggregations that can be computed
* during the accumulation phase for all the instrument.
*
* <p>The synchronous instruments will create an {@link AggregatorHandle} to record individual
* measurements synchronously, and for asynchronous the {@link #accumulateDoubleMeasurement} or
* {@link #accumulateLongMeasurement} will be used when reading values from the instrument
* callbacks.
* during the collection phase for all the instruments.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
@Immutable
public interface Aggregator<T, U extends ExemplarData> {
public interface Aggregator<T extends PointData, U extends ExemplarData> {
/** Returns the drop aggregator, an aggregator that drops measurements. */
static Aggregator<Object, DoubleExemplarData> drop() {
static Aggregator<?, DoubleExemplarData> drop() {
return DropAggregator.INSTANCE;
}
@ -47,67 +41,41 @@ public interface Aggregator<T, U extends ExemplarData> {
AggregatorHandle<T, U> createHandle();
/**
* Returns a new {@code Accumulation} for the given value. This MUST be used by the asynchronous
* instruments to create {@code Accumulation} that are passed to the processor.
*
* @param value the given value to be used to create the {@code Accumulation}.
* @return a new {@code Accumulation} for the given value, or {@code null} if there are no
* recordings.
*/
@Nullable
default T accumulateLongMeasurement(long value, Attributes attributes, Context context) {
AggregatorHandle<T, U> handle = createHandle();
handle.recordLong(value, attributes, context);
return handle.accumulateThenMaybeReset(attributes, /* reset= */ true);
}
/**
* Returns a new {@code Accumulation} for the given value. This MUST be used by the asynchronous
* instruments to create {@code Accumulation} that are passed to the processor.
*
* @param value the given value to be used to create the {@code Accumulation}.
* @return a new {@code Accumulation} for the given value, or {@code null} if there are no
* recordings.
*/
@Nullable
default T accumulateDoubleMeasurement(double value, Attributes attributes, Context context) {
AggregatorHandle<T, U> handle = createHandle();
handle.recordDouble(value, attributes, context);
return handle.accumulateThenMaybeReset(attributes, /* reset= */ true);
}
/**
* Returns a new DELTA aggregation by comparing two cumulative measurements.
* Returns a new DELTA point by computing the difference between two cumulative points.
*
* <p>Aggregators MUST implement diff if it can be used with asynchronous instruments.
*
* @param previousCumulative the previously captured accumulation.
* @param currentCumulative the newly captured (cumulative) accumulation.
* @return The resulting delta accumulation.
* @param previousCumulative the previously captured point.
* @param currentCumulative the newly captured (cumulative) point.
* @return The resulting delta point.
*/
default T diff(T previousCumulative, T currentCumulative) {
throw new UnsupportedOperationException("This aggregator does not support diff.");
}
/**
* Return a new point representing the measurement.
*
* <p>Aggregators MUST implement diff if it can be used with asynchronous instruments.
*/
default T toPoint(Measurement measurement) {
throw new UnsupportedOperationException("This aggregator does not support toPoint.");
}
/**
* Returns the {@link MetricData} that this {@code Aggregation} will produce.
*
* @param resource the resource producing the metric.
* @param instrumentationScopeInfo the scope that instrumented the metric.
* @param metricDescriptor the name, description and unit of the metric.
* @param accumulationByLabels the map of Labels to Accumulation.
* @param temporality the temporality of the accumulation.
* @param startEpochNanos the startEpochNanos for the {@code Point}.
* @param epochNanos the epochNanos for the {@code Point}.
* @param points list of points
* @param temporality the temporality of the metric.
* @return the {@link MetricDataType} that this {@code Aggregation} will produce.
*/
MetricData toMetricData(
Resource resource,
InstrumentationScopeInfo instrumentationScopeInfo,
MetricDescriptor metricDescriptor,
Map<Attributes, T> accumulationByLabels,
AggregationTemporality temporality,
long startEpochNanos,
long lastCollectionEpoch,
long epochNanos);
Collection<T> points,
AggregationTemporality temporality);
}

View File

@ -6,6 +6,7 @@
package io.opentelemetry.sdk.metrics.internal.aggregator;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter;
@ -28,7 +29,7 @@ public interface AggregatorFactory {
* @return a new {@link Aggregator}. {@link Aggregator#drop()} indicates no measurements should be
* recorded.
*/
<T, U extends ExemplarData> Aggregator<T, U> createAggregator(
<T extends PointData, U extends ExemplarData> Aggregator<T, U> createAggregator(
InstrumentDescriptor instrumentDescriptor, ExemplarFilter exemplarFilter);
/**

View File

@ -8,6 +8,7 @@ package io.opentelemetry.sdk.metrics.internal.aggregator;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.metrics.internal.state.BoundStorageHandle;
import java.util.List;
@ -32,7 +33,8 @@ import javax.annotation.concurrent.ThreadSafe;
* at any time.
*/
@ThreadSafe
public abstract class AggregatorHandle<T, U extends ExemplarData> implements BoundStorageHandle {
public abstract class AggregatorHandle<T extends PointData, U extends ExemplarData>
implements BoundStorageHandle {
// Atomically counts the number of references (usages) while also keeping a state of
// mapped/unmapped into a registry map.
private final AtomicLong refCountMapped;
@ -89,18 +91,29 @@ public abstract class AggregatorHandle<T, U extends ExemplarData> implements Bou
* current value in this {@code Aggregator}.
*/
@Nullable
public final T accumulateThenMaybeReset(Attributes attributes, boolean reset) {
public final T aggregateThenMaybeReset(
long startEpochNanos, long epochNanos, Attributes attributes, boolean reset) {
if (!hasRecordings) {
return null;
}
if (reset) {
hasRecordings = false;
}
return doAccumulateThenMaybeReset(exemplarReservoir.collectAndReset(attributes), reset);
return doAggregateThenMaybeReset(
startEpochNanos,
epochNanos,
attributes,
exemplarReservoir.collectAndReset(attributes),
reset);
}
/** Implementation of the {@link #accumulateThenMaybeReset(Attributes, boolean)}. */
protected abstract T doAccumulateThenMaybeReset(List<U> exemplars, boolean reset);
/** Implementation of the {@link #aggregateThenMaybeReset(long, long, Attributes, boolean)} . */
protected abstract T doAggregateThenMaybeReset(
long startEpochNanos,
long epochNanos,
Attributes attributes,
List<U> exemplars,
boolean reset);
@Override
public final void recordLong(long value, Attributes attributes, Context context) {

View File

@ -1,41 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics.internal.aggregator;
import com.google.auto.value.AutoValue;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import java.util.Collections;
import java.util.List;
import javax.annotation.concurrent.Immutable;
/**
* An accumulation representing {@code long} values and exemplars.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*
* <p>Visible for testing.
*/
@Immutable
@AutoValue
public abstract class DoubleAccumulation {
static DoubleAccumulation create(double value, List<DoubleExemplarData> exemplars) {
return new AutoValue_DoubleAccumulation(value, exemplars);
}
public static DoubleAccumulation create(double value) {
return create(value, Collections.emptyList());
}
DoubleAccumulation() {}
/** The current value. */
public abstract double getValue();
/** Sampled measurements recorded during this accumulation. */
abstract List<DoubleExemplarData> getExemplars();
}

View File

@ -8,19 +8,22 @@ package io.opentelemetry.sdk.metrics.internal.aggregator;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.internal.GuardedBy;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.internal.PrimitiveLongList;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.data.HistogramPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramPointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.resources.Resource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
@ -31,7 +34,7 @@ import java.util.function.Supplier;
* at any time.
*/
public final class DoubleExplicitBucketHistogramAggregator
implements Aggregator<ExplicitBucketHistogramAccumulation, DoubleExemplarData> {
implements Aggregator<HistogramPointData, DoubleExemplarData> {
private final double[] boundaries;
// a cache for converting to MetricData
@ -58,8 +61,8 @@ public final class DoubleExplicitBucketHistogramAggregator
}
@Override
public AggregatorHandle<ExplicitBucketHistogramAccumulation, DoubleExemplarData> createHandle() {
return new Handle(this.boundaries, reservoirSupplier.get());
public AggregatorHandle<HistogramPointData, DoubleExemplarData> createHandle() {
return new Handle(this.boundaryList, this.boundaries, reservoirSupplier.get());
}
@Override
@ -67,30 +70,20 @@ public final class DoubleExplicitBucketHistogramAggregator
Resource resource,
InstrumentationScopeInfo instrumentationScopeInfo,
MetricDescriptor metricDescriptor,
Map<Attributes, ExplicitBucketHistogramAccumulation> accumulationByLabels,
AggregationTemporality temporality,
long startEpochNanos,
long lastCollectionEpoch,
long epochNanos) {
Collection<HistogramPointData> pointData,
AggregationTemporality temporality) {
return ImmutableMetricData.createDoubleHistogram(
resource,
instrumentationScopeInfo,
metricDescriptor.getName(),
metricDescriptor.getDescription(),
metricDescriptor.getSourceInstrument().getUnit(),
ImmutableHistogramData.create(
temporality,
MetricDataUtils.toExplicitBucketHistogramPointList(
accumulationByLabels,
(temporality == AggregationTemporality.CUMULATIVE)
? startEpochNanos
: lastCollectionEpoch,
epochNanos,
boundaryList)));
ImmutableHistogramData.create(temporality, pointData));
}
static final class Handle
extends AggregatorHandle<ExplicitBucketHistogramAccumulation, DoubleExemplarData> {
static final class Handle extends AggregatorHandle<HistogramPointData, DoubleExemplarData> {
// read-only
private final List<Double> boundaryList;
// read-only
private final double[] boundaries;
@ -111,8 +104,12 @@ public final class DoubleExplicitBucketHistogramAggregator
private final ReentrantLock lock = new ReentrantLock();
Handle(double[] boundaries, ExemplarReservoir<DoubleExemplarData> reservoir) {
Handle(
List<Double> boundaryList,
double[] boundaries,
ExemplarReservoir<DoubleExemplarData> reservoir) {
super(reservoir);
this.boundaryList = boundaryList;
this.boundaries = boundaries;
this.counts = new long[this.boundaries.length + 1];
this.sum = 0;
@ -122,17 +119,24 @@ public final class DoubleExplicitBucketHistogramAggregator
}
@Override
protected ExplicitBucketHistogramAccumulation doAccumulateThenMaybeReset(
List<DoubleExemplarData> exemplars, boolean reset) {
protected HistogramPointData doAggregateThenMaybeReset(
long startEpochNanos,
long epochNanos,
Attributes attributes,
List<DoubleExemplarData> exemplars,
boolean reset) {
lock.lock();
try {
ExplicitBucketHistogramAccumulation acc =
ExplicitBucketHistogramAccumulation.create(
HistogramPointData pointData =
ImmutableHistogramPointData.create(
startEpochNanos,
epochNanos,
attributes,
sum,
this.count > 0,
this.count > 0 ? this.min : -1,
this.count > 0 ? this.max : -1,
Arrays.copyOf(counts, counts.length),
this.count > 0 ? this.min : null,
this.count > 0 ? this.max : null,
boundaryList,
PrimitiveLongList.wrap(Arrays.copyOf(counts, counts.length)),
exemplars);
if (reset) {
this.sum = 0;
@ -141,7 +145,7 @@ public final class DoubleExplicitBucketHistogramAggregator
this.count = 0;
Arrays.fill(this.counts, 0);
}
return acc;
return pointData;
} finally {
lock.unlock();
}

View File

@ -11,12 +11,15 @@ import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.data.ExponentialHistogramBuckets;
import io.opentelemetry.sdk.metrics.data.ExponentialHistogramPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramPointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.resources.Resource;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -31,7 +34,7 @@ import javax.annotation.Nullable;
* at any time.
*/
public final class DoubleExponentialHistogramAggregator
implements Aggregator<ExponentialHistogramAccumulation, DoubleExemplarData> {
implements Aggregator<ExponentialHistogramPointData, DoubleExemplarData> {
private final Supplier<ExemplarReservoir<DoubleExemplarData>> reservoirSupplier;
private final int maxBuckets;
@ -52,7 +55,7 @@ public final class DoubleExponentialHistogramAggregator
}
@Override
public AggregatorHandle<ExponentialHistogramAccumulation, DoubleExemplarData> createHandle() {
public AggregatorHandle<ExponentialHistogramPointData, DoubleExemplarData> createHandle() {
return new Handle(reservoirSupplier.get(), maxBuckets, maxScale);
}
@ -61,29 +64,19 @@ public final class DoubleExponentialHistogramAggregator
Resource resource,
InstrumentationScopeInfo instrumentationScopeInfo,
MetricDescriptor metricDescriptor,
Map<Attributes, ExponentialHistogramAccumulation> accumulationByLabels,
AggregationTemporality temporality,
long startEpochNanos,
long lastCollectionEpoch,
long epochNanos) {
Collection<ExponentialHistogramPointData> points,
AggregationTemporality temporality) {
return ImmutableMetricData.createExponentialHistogram(
resource,
instrumentationScopeInfo,
metricDescriptor.getName(),
metricDescriptor.getDescription(),
metricDescriptor.getSourceInstrument().getUnit(),
ImmutableExponentialHistogramData.create(
temporality,
MetricDataUtils.toExponentialHistogramPointList(
accumulationByLabels,
(temporality == AggregationTemporality.CUMULATIVE)
? startEpochNanos
: lastCollectionEpoch,
epochNanos)));
ImmutableExponentialHistogramData.create(temporality, points));
}
static final class Handle
extends AggregatorHandle<ExponentialHistogramAccumulation, DoubleExemplarData> {
extends AggregatorHandle<ExponentialHistogramPointData, DoubleExemplarData> {
private final int maxBuckets;
@Nullable private DoubleExponentialHistogramBuckets positiveBuckets;
@Nullable private DoubleExponentialHistogramBuckets negativeBuckets;
@ -106,18 +99,24 @@ public final class DoubleExponentialHistogramAggregator
}
@Override
protected synchronized ExponentialHistogramAccumulation doAccumulateThenMaybeReset(
List<DoubleExemplarData> exemplars, boolean reset) {
ExponentialHistogramAccumulation acc =
ExponentialHistogramAccumulation.create(
protected synchronized ExponentialHistogramPointData doAggregateThenMaybeReset(
long startEpochNanos,
long epochNanos,
Attributes attributes,
List<DoubleExemplarData> exemplars,
boolean reset) {
ExponentialHistogramPointData point =
ImmutableExponentialHistogramPointData.create(
scale,
sum,
this.count > 0,
this.count > 0 ? this.min : -1,
this.count > 0 ? this.max : -1,
zeroCount,
this.count > 0 ? this.min : null,
this.count > 0 ? this.max : null,
resolveBuckets(this.positiveBuckets, scale, reset),
resolveBuckets(this.negativeBuckets, scale, reset),
zeroCount,
startEpochNanos,
epochNanos,
attributes,
exemplars);
if (reset) {
this.sum = 0;
@ -126,7 +125,7 @@ public final class DoubleExponentialHistogramAggregator
this.max = -1;
this.count = 0;
}
return acc;
return point;
}
private static ExponentialHistogramBuckets resolveBuckets(
@ -176,7 +175,8 @@ public final class DoubleExponentialHistogramAggregator
// Record; If recording fails, calculate scale reduction and scale down to fit new value.
// 2nd attempt at recording should work with new scale
// TODO: We should experiment with downscale on demand during sync execution and only
// unifying scale factor between positive/negative at collection time (doAccumulate).
// unifying scale factor between positive/negative at collection time
// (doAggregateThenMaybeReset).
if (!buckets.record(value)) {
// getScaleReduction() used with downScale() will scale down as required to record value,
// fit inside max allowed buckets, and make sure index can be represented by int.

View File

@ -9,14 +9,17 @@ import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.data.DoublePointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.metrics.internal.state.Measurement;
import io.opentelemetry.sdk.resources.Resource;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
@ -36,7 +39,7 @@ import javax.annotation.concurrent.ThreadSafe;
*/
@ThreadSafe
public final class DoubleLastValueAggregator
implements Aggregator<DoubleAccumulation, DoubleExemplarData> {
implements Aggregator<DoublePointData, DoubleExemplarData> {
private final Supplier<ExemplarReservoir<DoubleExemplarData>> reservoirSupplier;
public DoubleLastValueAggregator(
@ -45,25 +48,31 @@ public final class DoubleLastValueAggregator
}
@Override
public AggregatorHandle<DoubleAccumulation, DoubleExemplarData> createHandle() {
public AggregatorHandle<DoublePointData, DoubleExemplarData> createHandle() {
return new Handle(reservoirSupplier.get());
}
@Override
public DoubleAccumulation diff(DoubleAccumulation previous, DoubleAccumulation current) {
public DoublePointData diff(DoublePointData previous, DoublePointData current) {
return current;
}
@Override
public DoublePointData toPoint(Measurement measurement) {
return ImmutableDoublePointData.create(
measurement.startEpochNanos(),
measurement.epochNanos(),
measurement.attributes(),
measurement.doubleValue());
}
@Override
public MetricData toMetricData(
Resource resource,
InstrumentationScopeInfo instrumentationScopeInfo,
MetricDescriptor descriptor,
Map<Attributes, DoubleAccumulation> accumulationByLabels,
AggregationTemporality temporality,
long startEpochNanos,
long lastCollectionEpochNanos,
long epochNanos) {
Collection<DoublePointData> points,
AggregationTemporality temporality) {
// Gauge does not need a start time, but we send one as advised by the data model
// for identifying resets.
return ImmutableMetricData.createDoubleGauge(
@ -72,16 +81,10 @@ public final class DoubleLastValueAggregator
descriptor.getName(),
descriptor.getDescription(),
descriptor.getSourceInstrument().getUnit(),
ImmutableGaugeData.create(
MetricDataUtils.toDoublePointList(
accumulationByLabels,
(temporality == AggregationTemporality.CUMULATIVE)
? startEpochNanos
: lastCollectionEpochNanos,
epochNanos)));
ImmutableGaugeData.create(points));
}
static final class Handle extends AggregatorHandle<DoubleAccumulation, DoubleExemplarData> {
static final class Handle extends AggregatorHandle<DoublePointData, DoubleExemplarData> {
@Nullable private static final Double DEFAULT_VALUE = null;
private final AtomicReference<Double> current = new AtomicReference<>(DEFAULT_VALUE);
@ -90,12 +93,15 @@ public final class DoubleLastValueAggregator
}
@Override
protected DoubleAccumulation doAccumulateThenMaybeReset(
List<DoubleExemplarData> exemplars, boolean reset) {
if (reset) {
return DoubleAccumulation.create(this.current.getAndSet(DEFAULT_VALUE), exemplars);
}
return DoubleAccumulation.create(Objects.requireNonNull(this.current.get()), exemplars);
protected DoublePointData doAggregateThenMaybeReset(
long startEpochNanos,
long epochNanos,
Attributes attributes,
List<DoubleExemplarData> exemplars,
boolean reset) {
Double value = reset ? this.current.getAndSet(DEFAULT_VALUE) : this.current.get();
return ImmutableDoublePointData.create(
startEpochNanos, epochNanos, attributes, Objects.requireNonNull(value), exemplars);
}
@Override

View File

@ -6,21 +6,23 @@
package io.opentelemetry.sdk.metrics.internal.aggregator;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.data.DoublePointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.concurrent.AdderUtil;
import io.opentelemetry.sdk.metrics.internal.concurrent.DoubleAdder;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.metrics.internal.state.Measurement;
import io.opentelemetry.sdk.resources.Resource;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
/**
@ -30,7 +32,7 @@ import java.util.function.Supplier;
* at any time.
*/
public final class DoubleSumAggregator
extends AbstractSumAggregator<DoubleAccumulation, DoubleExemplarData> {
extends AbstractSumAggregator<DoublePointData, DoubleExemplarData> {
private final Supplier<ExemplarReservoir<DoubleExemplarData>> reservoirSupplier;
/**
@ -48,21 +50,27 @@ public final class DoubleSumAggregator
}
@Override
public AggregatorHandle<DoubleAccumulation, DoubleExemplarData> createHandle() {
public AggregatorHandle<DoublePointData, DoubleExemplarData> createHandle() {
return new Handle(reservoirSupplier.get());
}
@Override
public DoubleAccumulation accumulateDoubleMeasurement(
double value, Attributes attributes, Context context) {
return DoubleAccumulation.create(value);
public DoublePointData diff(DoublePointData previousPoint, DoublePointData currentPoint) {
return ImmutableDoublePointData.create(
currentPoint.getStartEpochNanos(),
currentPoint.getEpochNanos(),
currentPoint.getAttributes(),
currentPoint.getValue() - previousPoint.getValue(),
currentPoint.getExemplars());
}
@Override
public DoubleAccumulation diff(
DoubleAccumulation previousAccumulation, DoubleAccumulation accumulation) {
return DoubleAccumulation.create(
accumulation.getValue() - previousAccumulation.getValue(), accumulation.getExemplars());
public DoublePointData toPoint(Measurement measurement) {
return ImmutableDoublePointData.create(
measurement.startEpochNanos(),
measurement.epochNanos(),
measurement.attributes(),
measurement.doubleValue());
}
@Override
@ -70,29 +78,18 @@ public final class DoubleSumAggregator
Resource resource,
InstrumentationScopeInfo instrumentationScopeInfo,
MetricDescriptor descriptor,
Map<Attributes, DoubleAccumulation> accumulationByLabels,
AggregationTemporality temporality,
long startEpochNanos,
long lastCollectionEpoch,
long epochNanos) {
Collection<DoublePointData> points,
AggregationTemporality temporality) {
return ImmutableMetricData.createDoubleSum(
resource,
instrumentationScopeInfo,
descriptor.getName(),
descriptor.getDescription(),
descriptor.getSourceInstrument().getUnit(),
ImmutableSumData.create(
isMonotonic(),
temporality,
MetricDataUtils.toDoublePointList(
accumulationByLabels,
temporality == AggregationTemporality.CUMULATIVE
? startEpochNanos
: lastCollectionEpoch,
epochNanos)));
ImmutableSumData.create(isMonotonic(), temporality, points));
}
static final class Handle extends AggregatorHandle<DoubleAccumulation, DoubleExemplarData> {
static final class Handle extends AggregatorHandle<DoublePointData, DoubleExemplarData> {
private final DoubleAdder current = AdderUtil.createDoubleAdder();
Handle(ExemplarReservoir<DoubleExemplarData> exemplarReservoir) {
@ -100,12 +97,15 @@ public final class DoubleSumAggregator
}
@Override
protected DoubleAccumulation doAccumulateThenMaybeReset(
List<DoubleExemplarData> exemplars, boolean reset) {
if (reset) {
return DoubleAccumulation.create(this.current.sumThenReset(), exemplars);
}
return DoubleAccumulation.create(this.current.sum(), exemplars);
protected DoublePointData doAggregateThenMaybeReset(
long startEpochNanos,
long epochNanos,
Attributes attributes,
List<DoubleExemplarData> exemplars,
boolean reset) {
double value = reset ? this.current.sumThenReset() : this.current.sum();
return ImmutableDoublePointData.create(
startEpochNanos, epochNanos, attributes, value, exemplars);
}
@Override

View File

@ -9,12 +9,15 @@ import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.resources.Resource;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* A "null object" Aggregator which denotes no aggregation should occur.
@ -22,31 +25,56 @@ import java.util.Map;
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class DropAggregator implements Aggregator<Object, DoubleExemplarData> {
public final class DropAggregator implements Aggregator<PointData, DoubleExemplarData> {
private static final Object ACCUMULATION = new Object();
private static final PointData POINT_DATA =
new PointData() {
@Override
public long getStartEpochNanos() {
return 0;
}
public static final Aggregator<Object, DoubleExemplarData> INSTANCE = new DropAggregator();
@Override
public long getEpochNanos() {
return 0;
}
@Override
public Attributes getAttributes() {
return Attributes.empty();
}
@Override
public List<? extends ExemplarData> getExemplars() {
return Collections.emptyList();
}
};
public static final Aggregator<PointData, DoubleExemplarData> INSTANCE = new DropAggregator();
private static final AggregatorHandle<PointData, DoubleExemplarData> HANDLE =
new AggregatorHandle<PointData, DoubleExemplarData>(ExemplarReservoir.doubleNoSamples()) {
@Override
protected PointData doAggregateThenMaybeReset(
long startEpochNanos,
long epochNanos,
Attributes attributes,
List<DoubleExemplarData> exemplars,
boolean reset) {
return POINT_DATA;
}
private static final AggregatorHandle<Object, DoubleExemplarData> HANDLE =
new AggregatorHandle<Object, DoubleExemplarData>(ExemplarReservoir.doubleNoSamples()) {
@Override
protected void doRecordLong(long value) {}
@Override
protected void doRecordDouble(double value) {}
@Override
protected Object doAccumulateThenMaybeReset(
List<DoubleExemplarData> exemplars, boolean reset) {
return ACCUMULATION;
}
};
private DropAggregator() {}
@Override
public AggregatorHandle<Object, DoubleExemplarData> createHandle() {
public AggregatorHandle<PointData, DoubleExemplarData> createHandle() {
return HANDLE;
}
@ -54,12 +82,9 @@ public final class DropAggregator implements Aggregator<Object, DoubleExemplarDa
public MetricData toMetricData(
Resource resource,
InstrumentationScopeInfo instrumentationScopeInfo,
MetricDescriptor descriptor,
Map<Attributes, Object> accumulationByLabels,
AggregationTemporality temporality,
long startEpochNanos,
long lastCollectionEpoch,
long epochNanos) {
MetricDescriptor metricDescriptor,
Collection<PointData> points,
AggregationTemporality temporality) {
return EmptyMetricData.getInstance();
}
}

View File

@ -1,74 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics.internal.aggregator;
import com.google.auto.value.AutoValue;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import java.util.Collections;
import java.util.List;
import javax.annotation.concurrent.Immutable;
@Immutable
@AutoValue
abstract class ExplicitBucketHistogramAccumulation {
/**
* Creates a new {@link ExplicitBucketHistogramAccumulation} with the given values. Assume
* `counts` is read-only so we don't need a defensive-copy here.
*
* @return a new {@link ExplicitBucketHistogramAccumulation} with the given values.
*/
static ExplicitBucketHistogramAccumulation create(
double sum, boolean hasMinMax, double min, double max, long[] counts) {
return create(sum, hasMinMax, min, max, counts, Collections.emptyList());
}
static ExplicitBucketHistogramAccumulation create(
double sum,
boolean hasMinMax,
double min,
double max,
long[] counts,
List<DoubleExemplarData> exemplars) {
return new AutoValue_ExplicitBucketHistogramAccumulation(
sum, hasMinMax, min, max, counts, exemplars);
}
ExplicitBucketHistogramAccumulation() {}
/**
* The sum of all measurements recorded.
*
* @return the sum of recorded measurements.
*/
abstract double getSum();
/** Return {@code true} if {@link #getMin()} and {@link #getMax()} is set. */
abstract boolean hasMinMax();
/**
* The min of all measurements recorded, if {@link #hasMinMax()} is {@code true}. If {@link
* #hasMinMax()} is {@code false}, the response should be ignored.
*/
abstract double getMin();
/**
* The max of all measurements recorded, if {@link #hasMinMax()} is {@code true}. If {@link
* #hasMinMax()} is {@code false}, the response should be ignored.
*/
abstract double getMax();
/**
* The counts in each bucket. The returned type is a mutable object, but it should be fine because
* the class is only used internally.
*
* @return the counts in each bucket. <b>do not mutate</b> the returned object.
*/
@SuppressWarnings("mutable")
abstract long[] getCounts();
/** Exemplars accumulated during this period. */
abstract List<DoubleExemplarData> getExemplars();
}

View File

@ -1,49 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics.internal.aggregator;
import com.google.auto.value.AutoValue;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.data.ExponentialHistogramBuckets;
import java.util.List;
@AutoValue
abstract class ExponentialHistogramAccumulation {
ExponentialHistogramAccumulation() {}
/** Creates a new {@link ExponentialHistogramAccumulation} with the given values. */
static ExponentialHistogramAccumulation create(
int scale,
double sum,
boolean hasMinMax,
double min,
double max,
ExponentialHistogramBuckets positiveBuckets,
ExponentialHistogramBuckets negativeBuckets,
long zeroCount,
List<DoubleExemplarData> exemplars) {
return new AutoValue_ExponentialHistogramAccumulation(
scale, sum, hasMinMax, min, max, positiveBuckets, negativeBuckets, zeroCount, exemplars);
}
abstract int getScale();
abstract double getSum();
abstract boolean hasMinMax();
abstract double getMin();
abstract double getMax();
abstract ExponentialHistogramBuckets getPositiveBuckets();
abstract ExponentialHistogramBuckets getNegativeBuckets();
abstract long getZeroCount();
abstract List<DoubleExemplarData> getExemplars();
}

View File

@ -1,34 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics.internal.aggregator;
import com.google.auto.value.AutoValue;
import io.opentelemetry.sdk.metrics.data.LongExemplarData;
import java.util.Collections;
import java.util.List;
import javax.annotation.concurrent.Immutable;
/** An accumulation representing {@code long} values and exemplars. */
@Immutable
@AutoValue
abstract class LongAccumulation {
static LongAccumulation create(long value, List<LongExemplarData> exemplars) {
return new AutoValue_LongAccumulation(value, exemplars);
}
static LongAccumulation create(long value) {
return create(value, Collections.emptyList());
}
LongAccumulation() {}
/** The current value. */
abstract long getValue();
/** Sampled measurements recorded during this accumulation. */
abstract List<LongExemplarData> getExemplars();
}

View File

@ -9,14 +9,17 @@ import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.LongExemplarData;
import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.metrics.internal.state.Measurement;
import io.opentelemetry.sdk.resources.Resource;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
@ -33,8 +36,7 @@ import javax.annotation.Nullable;
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class LongLastValueAggregator
implements Aggregator<LongAccumulation, LongExemplarData> {
public final class LongLastValueAggregator implements Aggregator<LongPointData, LongExemplarData> {
private final Supplier<ExemplarReservoir<LongExemplarData>> reservoirSupplier;
public LongLastValueAggregator(Supplier<ExemplarReservoir<LongExemplarData>> reservoirSupplier) {
@ -42,25 +44,31 @@ public final class LongLastValueAggregator
}
@Override
public AggregatorHandle<LongAccumulation, LongExemplarData> createHandle() {
public AggregatorHandle<LongPointData, LongExemplarData> createHandle() {
return new Handle(reservoirSupplier.get());
}
@Override
public LongAccumulation diff(LongAccumulation previous, LongAccumulation current) {
public LongPointData diff(LongPointData previous, LongPointData current) {
return current;
}
@Override
public LongPointData toPoint(Measurement measurement) {
return ImmutableLongPointData.create(
measurement.startEpochNanos(),
measurement.epochNanos(),
measurement.attributes(),
measurement.longValue());
}
@Override
public MetricData toMetricData(
Resource resource,
InstrumentationScopeInfo instrumentationScopeInfo,
MetricDescriptor descriptor,
Map<Attributes, LongAccumulation> accumulationByLabels,
AggregationTemporality temporality,
long startEpochNanos,
long lastCollectionEpoch,
long epochNanos) {
Collection<LongPointData> points,
AggregationTemporality temporality) {
// Last-Value ignores temporality generally, but we can set a start time on the gauge.
return ImmutableMetricData.createLongGauge(
resource,
@ -68,16 +76,10 @@ public final class LongLastValueAggregator
descriptor.getName(),
descriptor.getDescription(),
descriptor.getSourceInstrument().getUnit(),
ImmutableGaugeData.create(
MetricDataUtils.toLongPointList(
accumulationByLabels,
(temporality == AggregationTemporality.CUMULATIVE)
? startEpochNanos
: lastCollectionEpoch,
epochNanos)));
ImmutableGaugeData.create(points));
}
static final class Handle extends AggregatorHandle<LongAccumulation, LongExemplarData> {
static final class Handle extends AggregatorHandle<LongPointData, LongExemplarData> {
@Nullable private static final Long DEFAULT_VALUE = null;
private final AtomicReference<Long> current = new AtomicReference<>(DEFAULT_VALUE);
@ -86,12 +88,15 @@ public final class LongLastValueAggregator
}
@Override
protected LongAccumulation doAccumulateThenMaybeReset(
List<LongExemplarData> exemplars, boolean reset) {
if (reset) {
return LongAccumulation.create(this.current.getAndSet(DEFAULT_VALUE), exemplars);
}
return LongAccumulation.create(Objects.requireNonNull(this.current.get()), exemplars);
protected LongPointData doAggregateThenMaybeReset(
long startEpochNanos,
long epochNanos,
Attributes attributes,
List<LongExemplarData> exemplars,
boolean reset) {
Long value = reset ? this.current.getAndSet(DEFAULT_VALUE) : this.current.get();
return ImmutableLongPointData.create(
startEpochNanos, epochNanos, attributes, Objects.requireNonNull(value), exemplars);
}
@Override

View File

@ -9,17 +9,20 @@ import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.LongExemplarData;
import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.concurrent.AdderUtil;
import io.opentelemetry.sdk.metrics.internal.concurrent.LongAdder;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.metrics.internal.state.Measurement;
import io.opentelemetry.sdk.resources.Resource;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
/**
@ -29,7 +32,7 @@ import java.util.function.Supplier;
* at any time.
*/
public final class LongSumAggregator
extends AbstractSumAggregator<LongAccumulation, LongExemplarData> {
extends AbstractSumAggregator<LongPointData, LongExemplarData> {
private final Supplier<ExemplarReservoir<LongExemplarData>> reservoirSupplier;
@ -41,15 +44,27 @@ public final class LongSumAggregator
}
@Override
public AggregatorHandle<LongAccumulation, LongExemplarData> createHandle() {
public AggregatorHandle<LongPointData, LongExemplarData> createHandle() {
return new Handle(reservoirSupplier.get());
}
@Override
public LongAccumulation diff(
LongAccumulation previousAccumulation, LongAccumulation accumulation) {
return LongAccumulation.create(
accumulation.getValue() - previousAccumulation.getValue(), accumulation.getExemplars());
public LongPointData diff(LongPointData previousPoint, LongPointData currentPoint) {
return ImmutableLongPointData.create(
currentPoint.getStartEpochNanos(),
currentPoint.getEpochNanos(),
currentPoint.getAttributes(),
currentPoint.getValue() - previousPoint.getValue(),
currentPoint.getExemplars());
}
@Override
public LongPointData toPoint(Measurement measurement) {
return ImmutableLongPointData.create(
measurement.startEpochNanos(),
measurement.epochNanos(),
measurement.attributes(),
measurement.longValue());
}
@Override
@ -57,29 +72,18 @@ public final class LongSumAggregator
Resource resource,
InstrumentationScopeInfo instrumentationScopeInfo,
MetricDescriptor descriptor,
Map<Attributes, LongAccumulation> accumulationByLabels,
AggregationTemporality temporality,
long startEpochNanos,
long lastCollectionEpoch,
long epochNanos) {
Collection<LongPointData> points,
AggregationTemporality temporality) {
return ImmutableMetricData.createLongSum(
resource,
instrumentationScopeInfo,
descriptor.getName(),
descriptor.getDescription(),
descriptor.getSourceInstrument().getUnit(),
ImmutableSumData.create(
isMonotonic(),
temporality,
MetricDataUtils.toLongPointList(
accumulationByLabels,
temporality == AggregationTemporality.CUMULATIVE
? startEpochNanos
: lastCollectionEpoch,
epochNanos)));
ImmutableSumData.create(isMonotonic(), temporality, points));
}
static final class Handle extends AggregatorHandle<LongAccumulation, LongExemplarData> {
static final class Handle extends AggregatorHandle<LongPointData, LongExemplarData> {
private final LongAdder current = AdderUtil.createLongAdder();
Handle(ExemplarReservoir<LongExemplarData> exemplarReservoir) {
@ -87,12 +91,15 @@ public final class LongSumAggregator
}
@Override
protected LongAccumulation doAccumulateThenMaybeReset(
List<LongExemplarData> exemplars, boolean reset) {
if (reset) {
return LongAccumulation.create(this.current.sumThenReset(), exemplars);
}
return LongAccumulation.create(this.current.sum(), exemplars);
protected LongPointData doAggregateThenMaybeReset(
long startEpochNanos,
long epochNanos,
Attributes attributes,
List<LongExemplarData> exemplars,
boolean reset) {
long value = reset ? this.current.sumThenReset() : this.current.sum();
return ImmutableLongPointData.create(
startEpochNanos, epochNanos, attributes, value, exemplars);
}
@Override

View File

@ -1,111 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics.internal.aggregator;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.internal.PrimitiveLongList;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.DoublePointData;
import io.opentelemetry.sdk.metrics.data.ExponentialHistogramPointData;
import io.opentelemetry.sdk.metrics.data.HistogramPointData;
import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramPointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramPointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
final class MetricDataUtils {
private MetricDataUtils() {}
/** Returns true if the instrument does not allow negative measurements. */
static boolean isMonotonicInstrument(InstrumentDescriptor descriptor) {
InstrumentType type = descriptor.getType();
return type == InstrumentType.HISTOGRAM
|| type == InstrumentType.COUNTER
|| type == InstrumentType.OBSERVABLE_COUNTER;
}
static List<LongPointData> toLongPointList(
Map<Attributes, LongAccumulation> accumulationMap, long startEpochNanos, long epochNanos) {
List<LongPointData> points = new ArrayList<>(accumulationMap.size());
accumulationMap.forEach(
(labels, accumulation) ->
points.add(
ImmutableLongPointData.create(
startEpochNanos,
epochNanos,
labels,
accumulation.getValue(),
accumulation.getExemplars())));
return points;
}
static List<DoublePointData> toDoublePointList(
Map<Attributes, DoubleAccumulation> accumulationMap, long startEpochNanos, long epochNanos) {
List<DoublePointData> points = new ArrayList<>(accumulationMap.size());
accumulationMap.forEach(
(labels, accumulation) ->
points.add(
ImmutableDoublePointData.create(
startEpochNanos,
epochNanos,
labels,
accumulation.getValue(),
accumulation.getExemplars())));
return points;
}
static List<HistogramPointData> toExplicitBucketHistogramPointList(
Map<Attributes, ExplicitBucketHistogramAccumulation> accumulationMap,
long startEpochNanos,
long epochNanos,
List<Double> boundaries) {
List<HistogramPointData> points = new ArrayList<>(accumulationMap.size());
accumulationMap.forEach(
(labels, aggregator) -> {
List<Long> counts = PrimitiveLongList.wrap(aggregator.getCounts().clone());
points.add(
ImmutableHistogramPointData.create(
startEpochNanos,
epochNanos,
labels,
aggregator.getSum(),
aggregator.getMin(),
aggregator.getMax(),
boundaries,
counts,
aggregator.getExemplars()));
});
return points;
}
static List<ExponentialHistogramPointData> toExponentialHistogramPointList(
Map<Attributes, ExponentialHistogramAccumulation> accumulationMap,
long startEpochNanos,
long epochNanos) {
List<ExponentialHistogramPointData> points = new ArrayList<>(accumulationMap.size());
accumulationMap.forEach(
(attributes, aggregator) ->
points.add(
ImmutableExponentialHistogramPointData.create(
aggregator.getScale(),
aggregator.getSum(),
aggregator.getZeroCount(),
aggregator.getMin(),
aggregator.getMax(),
aggregator.getPositiveBuckets(),
aggregator.getNegativeBuckets(),
startEpochNanos,
epochNanos,
attributes,
aggregator.getExemplars())));
return points;
}
}

View File

@ -15,6 +15,7 @@ import io.opentelemetry.sdk.metrics.View;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
@ -35,7 +36,8 @@ import java.util.logging.Logger;
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
final class AsynchronousMetricStorage<T, U extends ExemplarData> implements MetricStorage {
final class AsynchronousMetricStorage<T extends PointData, U extends ExemplarData>
implements MetricStorage {
private static final Logger logger = Logger.getLogger(AsynchronousMetricStorage.class.getName());
private final ThrottlingLogger throttlingLogger = new ThrottlingLogger(logger);
@ -44,8 +46,8 @@ final class AsynchronousMetricStorage<T, U extends ExemplarData> implements Metr
private final AggregationTemporality aggregationTemporality;
private final Aggregator<T, U> aggregator;
private final AttributesProcessor attributesProcessor;
private Map<Attributes, T> accumulations = new HashMap<>();
private Map<Attributes, T> lastAccumulations =
private Map<Attributes, T> points = new HashMap<>();
private Map<Attributes, T> lastPoints =
new HashMap<>(); // Only populated if aggregationTemporality == DELTA
private AsynchronousMetricStorage(
@ -67,7 +69,7 @@ final class AsynchronousMetricStorage<T, U extends ExemplarData> implements Metr
* Create an asynchronous storage instance for the {@link View} and {@link InstrumentDescriptor}.
*/
// TODO(anuraaga): The cast to generic type here looks suspicious.
static <T, U extends ExemplarData> AsynchronousMetricStorage<T, U> create(
static <T extends PointData, U extends ExemplarData> AsynchronousMetricStorage<T, U> create(
RegisteredReader registeredReader,
RegisteredView registeredView,
InstrumentDescriptor instrumentDescriptor) {
@ -84,38 +86,42 @@ final class AsynchronousMetricStorage<T, U extends ExemplarData> implements Metr
registeredView.getViewAttributesProcessor());
}
/** Record callback long measurements from {@link ObservableLongMeasurement}. */
void recordLong(long value, Attributes attributes) {
T accumulation = aggregator.accumulateLongMeasurement(value, attributes, Context.current());
if (accumulation != null) {
recordAccumulation(accumulation, attributes);
}
/**
* Record callback measurement from {@link ObservableLongMeasurement} or {@link
* ObservableDoubleMeasurement}.
*/
void record(Measurement measurement) {
Context context = Context.current();
Attributes processedAttributes = attributesProcessor.process(measurement.attributes(), context);
long start =
aggregationTemporality == AggregationTemporality.DELTA
? registeredReader.getLastCollectEpochNanos()
: measurement.startEpochNanos();
measurement =
measurement.hasDoubleValue()
? Measurement.doubleMeasurement(
start, measurement.epochNanos(), measurement.doubleValue(), processedAttributes)
: Measurement.longMeasurement(
start, measurement.epochNanos(), measurement.longValue(), processedAttributes);
recordPoint(aggregator.toPoint(measurement));
}
/** Record callback double measurements from {@link ObservableDoubleMeasurement}. */
void recordDouble(double value, Attributes attributes) {
T accumulation = aggregator.accumulateDoubleMeasurement(value, attributes, Context.current());
if (accumulation != null) {
recordAccumulation(accumulation, attributes);
}
}
private void recordPoint(T point) {
Attributes attributes = point.getAttributes();
private void recordAccumulation(T accumulation, Attributes attributes) {
Attributes processedAttributes = attributesProcessor.process(attributes, Context.current());
if (accumulations.size() >= MetricStorage.MAX_ACCUMULATIONS) {
if (points.size() >= MetricStorage.MAX_CARDINALITY) {
throttlingLogger.log(
Level.WARNING,
"Instrument "
+ metricDescriptor.getSourceInstrument().getName()
+ " has exceeded the maximum allowed accumulations ("
+ MetricStorage.MAX_ACCUMULATIONS
+ " has exceeded the maximum allowed cardinality ("
+ MetricStorage.MAX_CARDINALITY
+ ").");
return;
}
// Check there is not already a recording for the attributes
if (accumulations.containsKey(attributes)) {
if (points.containsKey(attributes)) {
throttlingLogger.log(
Level.WARNING,
"Instrument "
@ -124,7 +130,7 @@ final class AsynchronousMetricStorage<T, U extends ExemplarData> implements Metr
return;
}
accumulations.put(processedAttributes, accumulation);
points.put(attributes, point);
}
@Override
@ -145,27 +151,23 @@ final class AsynchronousMetricStorage<T, U extends ExemplarData> implements Metr
long epochNanos) {
Map<Attributes, T> result;
if (aggregationTemporality == AggregationTemporality.DELTA) {
Map<Attributes, T> accumulations = this.accumulations;
Map<Attributes, T> lastAccumulations = this.lastAccumulations;
lastAccumulations.entrySet().removeIf(entry -> !accumulations.containsKey(entry.getKey()));
accumulations.forEach(
(k, v) ->
lastAccumulations.compute(k, (k2, v2) -> v2 == null ? v : aggregator.diff(v2, v)));
result = lastAccumulations;
this.lastAccumulations = accumulations;
Map<Attributes, T> points = this.points;
Map<Attributes, T> lastPoints = this.lastPoints;
lastPoints.entrySet().removeIf(entry -> !points.containsKey(entry.getKey()));
points.forEach(
(k, v) -> lastPoints.compute(k, (k2, v2) -> v2 == null ? v : aggregator.diff(v2, v)));
result = lastPoints;
this.lastPoints = points;
} else {
result = accumulations;
result = points;
}
this.accumulations = new HashMap<>();
this.points = new HashMap<>();
return aggregator.toMetricData(
resource,
instrumentationScopeInfo,
metricDescriptor,
result,
aggregationTemporality,
startEpochNanos,
registeredReader.getLastCollectEpochNanos(),
epochNanos);
result.values(),
aggregationTemporality);
}
@Override

View File

@ -70,7 +70,7 @@ public final class CallbackRegistration {
return "CallbackRegistration{instrumentDescriptors=" + instrumentDescriptors + "}";
}
void invokeCallback(RegisteredReader reader) {
void invokeCallback(RegisteredReader reader, long startEpochNanos, long epochNanos) {
// Return early if no storages are registered
if (!hasStorages) {
return;
@ -78,7 +78,8 @@ public final class CallbackRegistration {
// Set the active reader on each observable measurement so that measurements are only recorded
// to relevant storages
observableMeasurements.forEach(
observableMeasurement -> observableMeasurement.setActiveReader(reader));
observableMeasurement ->
observableMeasurement.setActiveReader(reader, startEpochNanos, epochNanos));
try {
callback.run();
} catch (Throwable e) {
@ -87,7 +88,7 @@ public final class CallbackRegistration {
Level.WARNING, "An exception occurred invoking callback for " + this + ".", e);
} finally {
observableMeasurements.forEach(
observableMeasurement -> observableMeasurement.setActiveReader(null));
observableMeasurement -> observableMeasurement.unsetActiveReader());
}
}
}

View File

@ -12,6 +12,7 @@ import io.opentelemetry.sdk.internal.ThrottlingLogger;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorHandle;
import io.opentelemetry.sdk.metrics.internal.aggregator.EmptyMetricData;
@ -19,7 +20,8 @@ import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.internal.export.RegisteredReader;
import io.opentelemetry.sdk.metrics.internal.view.AttributesProcessor;
import io.opentelemetry.sdk.resources.Resource;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@ -32,7 +34,7 @@ import java.util.logging.Logger;
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class DefaultSynchronousMetricStorage<T, U extends ExemplarData>
public final class DefaultSynchronousMetricStorage<T extends PointData, U extends ExemplarData>
implements SynchronousMetricStorage {
private static final BoundStorageHandle NOOP_STORAGE_HANDLE = new NoopBoundHandle();
@ -100,13 +102,13 @@ public final class DefaultSynchronousMetricStorage<T, U extends ExemplarData>
// Missing entry or no longer mapped. Try to add a new one if not exceeded cardinality limits.
aggregatorHandle = aggregator.createHandle();
while (true) {
if (activeCollectionStorage.size() >= MAX_ACCUMULATIONS) {
if (activeCollectionStorage.size() >= MAX_CARDINALITY) {
logger.log(
Level.WARNING,
"Instrument "
+ metricDescriptor.getSourceInstrument().getName()
+ " has exceeded the maximum allowed accumulations ("
+ MAX_ACCUMULATIONS
+ " has exceeded the maximum allowed cardinality ("
+ MAX_CARDINALITY
+ ").");
return NOOP_STORAGE_HANDLE;
}
@ -159,9 +161,13 @@ public final class DefaultSynchronousMetricStorage<T, U extends ExemplarData>
long startEpochNanos,
long epochNanos) {
boolean reset = aggregationTemporality == AggregationTemporality.DELTA;
long start =
aggregationTemporality == AggregationTemporality.DELTA
? registeredReader.getLastCollectEpochNanos()
: startEpochNanos;
// Grab accumulated measurements.
Map<Attributes, T> accumulations = new HashMap<>();
// Grab aggregated points.
List<T> points = new ArrayList<>(activeCollectionStorage.size());
for (Map.Entry<Attributes, AggregatorHandle<T, U>> entry : activeCollectionStorage.entrySet()) {
if (reset) {
boolean unmappedEntry = entry.getValue().tryUnmap();
@ -171,26 +177,19 @@ public final class DefaultSynchronousMetricStorage<T, U extends ExemplarData>
activeCollectionStorage.remove(entry.getKey(), entry.getValue());
}
}
T accumulation = entry.getValue().accumulateThenMaybeReset(entry.getKey(), reset);
if (accumulation == null) {
T point = entry.getValue().aggregateThenMaybeReset(start, epochNanos, entry.getKey(), reset);
if (point == null) {
continue;
}
accumulations.put(entry.getKey(), accumulation);
points.add(point);
}
if (accumulations.isEmpty()) {
if (points.isEmpty()) {
return EmptyMetricData.getInstance();
}
return aggregator.toMetricData(
resource,
instrumentationScopeInfo,
metricDescriptor,
accumulations,
aggregationTemporality,
startEpochNanos,
registeredReader.getLastCollectEpochNanos(),
epochNanos);
resource, instrumentationScopeInfo, metricDescriptor, points, aggregationTemporality);
}
@Override

View File

@ -0,0 +1,57 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics.internal.state;
import com.google.auto.value.AutoValue;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
/**
* A long or double measurement recorded from {@link ObservableLongMeasurement} or {@link
* ObservableDoubleMeasurement}.
*/
@AutoValue
public abstract class Measurement {
static Measurement doubleMeasurement(
long startEpochNanos, long epochNanos, double value, Attributes attributes) {
return new AutoValue_Measurement(
startEpochNanos,
epochNanos,
/* hasLongValue= */ false,
0L,
/* hasDoubleValue= */ true,
value,
attributes);
}
static Measurement longMeasurement(
long startEpochNanos, long epochNanos, long value, Attributes attributes) {
return new AutoValue_Measurement(
startEpochNanos,
epochNanos,
/* hasLongValue= */ true,
value,
/* hasDoubleValue= */ false,
0.0,
attributes);
}
public abstract long startEpochNanos();
public abstract long epochNanos();
public abstract boolean hasLongValue();
public abstract long longValue();
public abstract boolean hasDoubleValue();
public abstract double doubleValue();
public abstract Attributes attributes();
}

View File

@ -81,7 +81,7 @@ public class MeterSharedState {
return instrumentationScopeInfo;
}
/** Collects all accumulated metric stream points. */
/** Collects all metrics. */
public List<MetricData> collectAll(
RegisteredReader registeredReader,
MeterProviderSharedState meterProviderSharedState,
@ -93,7 +93,8 @@ public class MeterSharedState {
// Collections across all readers are sequential
synchronized (collectLock) {
for (CallbackRegistration callbackRegistration : currentRegisteredCallbacks) {
callbackRegistration.invokeCallback(registeredReader);
callbackRegistration.invokeCallback(
registeredReader, meterProviderSharedState.getStartEpochNanos(), epochNanos);
}
Collection<MetricStorage> storages =

View File

@ -20,8 +20,8 @@ import io.opentelemetry.sdk.resources.Resource;
*/
public interface MetricStorage {
/** The max number of metric accumulations for a particular {@link MetricStorage}. */
int MAX_ACCUMULATIONS = 2000;
/** The max number of distinct metric points for a particular {@link MetricStorage}. */
int MAX_CARDINALITY = 2000;
/** Returns a description of the metric produced in this storage. */
MetricDescriptor getMetricDescriptor();

View File

@ -5,6 +5,9 @@
package io.opentelemetry.sdk.metrics.internal.state;
import static io.opentelemetry.sdk.metrics.internal.state.Measurement.doubleMeasurement;
import static io.opentelemetry.sdk.metrics.internal.state.Measurement.longMeasurement;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
@ -32,7 +35,12 @@ public final class SdkObservableMeasurement
private final InstrumentationScopeInfo instrumentationScopeInfo;
private final InstrumentDescriptor instrumentDescriptor;
private final List<AsynchronousMetricStorage<?, ?>> storages;
// These fields are set before invoking callbacks. They allow measurements to be recorded to the
// storages for correct reader, and with the correct time.
@Nullable private volatile RegisteredReader activeReader;
private volatile long startEpochNanos;
private volatile long epochNanos;
private SdkObservableMeasurement(
InstrumentationScopeInfo instrumentationScopeInfo,
@ -63,8 +71,22 @@ public final class SdkObservableMeasurement
return instrumentationScopeInfo;
}
public void setActiveReader(@Nullable RegisteredReader registeredReader) {
/**
* Set the active reader, and clock information. {@link #unsetActiveReader()} MUST be called
* after.
*/
public void setActiveReader(
RegisteredReader registeredReader, long startEpochNanos, long epochNanos) {
this.activeReader = registeredReader;
this.startEpochNanos = startEpochNanos;
this.epochNanos = epochNanos;
}
/**
* Unset the active reader. Called after {@link #setActiveReader(RegisteredReader, long, long)}.
*/
public void unsetActiveReader() {
this.activeReader = null;
}
InstrumentDescriptor getInstrumentDescriptor() {
@ -82,20 +104,7 @@ public final class SdkObservableMeasurement
@Override
public void record(long value, Attributes attributes) {
RegisteredReader activeReader = this.activeReader;
if (activeReader == null) {
throttlingLogger.log(
Level.FINE,
"Measurement recorded for instrument "
+ instrumentDescriptor.getName()
+ " outside callback registered to instrument. Dropping measurement.");
return;
}
for (AsynchronousMetricStorage<?, ?> storage : storages) {
if (storage.getRegisteredReader().equals(activeReader)) {
storage.recordLong(value, attributes);
}
}
doRecord(longMeasurement(startEpochNanos, epochNanos, value, attributes));
}
@Override
@ -105,6 +114,10 @@ public final class SdkObservableMeasurement
@Override
public void record(double value, Attributes attributes) {
doRecord(doubleMeasurement(startEpochNanos, epochNanos, value, attributes));
}
private void doRecord(Measurement measurement) {
RegisteredReader activeReader = this.activeReader;
if (activeReader == null) {
throttlingLogger.log(
@ -116,7 +129,7 @@ public final class SdkObservableMeasurement
}
for (AsynchronousMetricStorage<?, ?> storage : storages) {
if (storage.getRegisteredReader().equals(activeReader)) {
storage.recordDouble(value, attributes);
storage.record(measurement);
}
}
}

View File

@ -8,6 +8,7 @@ package io.opentelemetry.sdk.metrics.internal.state;
import io.opentelemetry.sdk.metrics.View;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
@ -35,7 +36,7 @@ public interface SynchronousMetricStorage extends MetricStorage, WriteableMetric
* @return The storage, or {@link EmptyMetricStorage#empty()} if the instrument should not be
* recorded.
*/
static <T, U extends ExemplarData> SynchronousMetricStorage create(
static <T extends PointData, U extends ExemplarData> SynchronousMetricStorage create(
RegisteredReader registeredReader,
RegisteredView registeredView,
InstrumentDescriptor instrumentDescriptor,

View File

@ -8,6 +8,7 @@ package io.opentelemetry.sdk.metrics.internal.view;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
@ -51,7 +52,7 @@ public final class DefaultAggregation implements Aggregation, AggregatorFactory
}
@Override
public <T, U extends ExemplarData> Aggregator<T, U> createAggregator(
public <T extends PointData, U extends ExemplarData> Aggregator<T, U> createAggregator(
InstrumentDescriptor instrumentDescriptor, ExemplarFilter exemplarFilter) {
return ((AggregatorFactory) resolve(instrumentDescriptor))
.createAggregator(instrumentDescriptor, exemplarFilter);

View File

@ -7,6 +7,7 @@ package io.opentelemetry.sdk.metrics.internal.view;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
@ -30,7 +31,7 @@ public final class DropAggregation implements Aggregation, AggregatorFactory {
@Override
@SuppressWarnings("unchecked")
public <T, U extends ExemplarData> Aggregator<T, U> createAggregator(
public <T extends PointData, U extends ExemplarData> Aggregator<T, U> createAggregator(
InstrumentDescriptor instrumentDescriptor, ExemplarFilter exemplarFilter) {
return (Aggregator<T, U>) Aggregator.drop();
}

View File

@ -8,6 +8,7 @@ package io.opentelemetry.sdk.metrics.internal.view;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.internal.aggregator.DoubleExplicitBucketHistogramAggregator;
@ -48,7 +49,7 @@ public final class ExplicitBucketHistogramAggregation implements Aggregation, Ag
@Override
@SuppressWarnings("unchecked")
public <T, U extends ExemplarData> Aggregator<T, U> createAggregator(
public <T extends PointData, U extends ExemplarData> Aggregator<T, U> createAggregator(
InstrumentDescriptor instrumentDescriptor, ExemplarFilter exemplarFilter) {
return (Aggregator<T, U>)
new DoubleExplicitBucketHistogramAggregator(

View File

@ -12,6 +12,7 @@ import io.opentelemetry.sdk.internal.RandomSupplier;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.MetricDataType;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.internal.aggregator.DoubleExponentialHistogramAggregator;
@ -64,7 +65,7 @@ public final class ExponentialHistogramAggregation implements Aggregation, Aggre
@Override
@SuppressWarnings("unchecked")
public <T, U extends ExemplarData> Aggregator<T, U> createAggregator(
public <T extends PointData, U extends ExemplarData> Aggregator<T, U> createAggregator(
InstrumentDescriptor instrumentDescriptor, ExemplarFilter exemplarFilter) {
return (Aggregator<T, U>)
new DoubleExponentialHistogramAggregator(

View File

@ -8,6 +8,7 @@ package io.opentelemetry.sdk.metrics.internal.view;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.internal.aggregator.DoubleLastValueAggregator;
@ -34,7 +35,7 @@ public final class LastValueAggregation implements Aggregation, AggregatorFactor
@Override
@SuppressWarnings("unchecked")
public <T, U extends ExemplarData> Aggregator<T, U> createAggregator(
public <T extends PointData, U extends ExemplarData> Aggregator<T, U> createAggregator(
InstrumentDescriptor instrumentDescriptor, ExemplarFilter exemplarFilter) {
// For the initial version we do not sample exemplars on gauges.

View File

@ -11,6 +11,7 @@ import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.LongExemplarData;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.internal.aggregator.DoubleSumAggregator;
@ -37,7 +38,7 @@ public final class SumAggregation implements Aggregation, AggregatorFactory {
@Override
@SuppressWarnings("unchecked")
public <T, U extends ExemplarData> Aggregator<T, U> createAggregator(
public <T extends PointData, U extends ExemplarData> Aggregator<T, U> createAggregator(
InstrumentDescriptor instrumentDescriptor, ExemplarFilter exemplarFilter) {
switch (instrumentDescriptor.getValueType()) {
case LONG:

View File

@ -26,8 +26,8 @@ import org.junit.jupiter.api.Test;
@SuppressLogger(DefaultSynchronousMetricStorage.class)
class CardinalityTest {
/** Traces {@code MetricStorageUtils#MAX_ACCUMULATIONS}. */
private static final int MAX_ACCUMULATIONS = 2000;
/** Traces {@code MetricStorageUtils#MAX_CARDINALITY}. */
private static final int MAX_CARDINALITY = 2000;
private InMemoryMetricReader deltaReader;
private InMemoryMetricReader cumulativeReader;
@ -159,13 +159,13 @@ class CardinalityTest {
/**
* Records to sync instruments, many distinct attributes. Validates that the {@code
* MetricStorageUtils#MAX_ACCUMULATIONS} is enforced for each instrument.
* MetricStorageUtils#MAX_CARDINALITY} is enforced for each instrument.
*/
@Test
void cardinalityLimits_synchronousInstrument() {
LongCounter syncCounter1 = meter.counterBuilder("sync-counter1").build();
LongCounter syncCounter2 = meter.counterBuilder("sync-counter2").build();
for (int i = 0; i < MAX_ACCUMULATIONS + 1; i++) {
for (int i = 0; i < MAX_CARDINALITY + 1; i++) {
syncCounter1.add(1, Attributes.builder().put("key", "value" + i).build());
syncCounter2.add(1, Attributes.builder().put("key", "value" + i).build());
}
@ -183,7 +183,7 @@ class CardinalityTest {
(Consumer<SumData<LongPointData>>)
sumPointData ->
assertThat(sumPointData.getPoints().size())
.isEqualTo(MAX_ACCUMULATIONS))),
.isEqualTo(MAX_CARDINALITY))),
metricData ->
assertThat(metricData)
.hasName("sync-counter2")
@ -194,7 +194,7 @@ class CardinalityTest {
(Consumer<SumData<LongPointData>>)
sumPointData ->
assertThat(sumPointData.getPoints().size())
.isEqualTo(MAX_ACCUMULATIONS))));
.isEqualTo(MAX_CARDINALITY))));
assertThat(cumulativeReader.collectAllMetrics())
.as("Cumulative collection")
@ -209,7 +209,7 @@ class CardinalityTest {
(Consumer<SumData<LongPointData>>)
sumPointData ->
assertThat(sumPointData.getPoints().size())
.isEqualTo(MAX_ACCUMULATIONS))),
.isEqualTo(MAX_CARDINALITY))),
metricData ->
assertThat(metricData)
.hasName("sync-counter2")
@ -220,18 +220,18 @@ class CardinalityTest {
(Consumer<SumData<LongPointData>>)
sumPointData ->
assertThat(sumPointData.getPoints().size())
.isEqualTo(MAX_ACCUMULATIONS))));
.isEqualTo(MAX_CARDINALITY))));
}
/**
* Records to sync instruments, many distinct attributes. Validates that the {@code
* MetricStorageUtils#MAX_ACCUMULATIONS} is enforced for each instrument.
* MetricStorageUtils#MAX_CARDINALITY} is enforced for each instrument.
*/
@Test
void cardinalityLimits_asynchronousInstrument() {
Consumer<ObservableLongMeasurement> callback =
measurement -> {
for (int i = 0; i < MAX_ACCUMULATIONS + 1; i++) {
for (int i = 0; i < MAX_CARDINALITY + 1; i++) {
measurement.record(1, Attributes.builder().put("key", "value" + i).build());
}
};
@ -251,7 +251,7 @@ class CardinalityTest {
(Consumer<SumData<LongPointData>>)
sumPointData ->
assertThat(sumPointData.getPoints().size())
.isEqualTo(MAX_ACCUMULATIONS))),
.isEqualTo(MAX_CARDINALITY))),
metricData ->
assertThat(metricData)
.hasName("async-counter2")
@ -262,7 +262,7 @@ class CardinalityTest {
(Consumer<SumData<LongPointData>>)
sumPointData ->
assertThat(sumPointData.getPoints().size())
.isEqualTo(MAX_ACCUMULATIONS))));
.isEqualTo(MAX_CARDINALITY))));
assertThat(cumulativeReader.collectAllMetrics())
.as("Cumulative collection")
@ -277,7 +277,7 @@ class CardinalityTest {
(Consumer<SumData<LongPointData>>)
sumPointData ->
assertThat(sumPointData.getPoints().size())
.isEqualTo(MAX_ACCUMULATIONS))),
.isEqualTo(MAX_CARDINALITY))),
metricData ->
assertThat(metricData)
.hasName("async-counter2")
@ -288,6 +288,6 @@ class CardinalityTest {
(Consumer<SumData<LongPointData>>)
sumPointData ->
assertThat(sumPointData.getPoints().size())
.isEqualTo(MAX_ACCUMULATIONS))));
.isEqualTo(MAX_CARDINALITY))));
}
}

View File

@ -16,6 +16,7 @@ import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.LongExemplarData;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoubleExemplarData;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
import java.util.Collections;
@ -98,7 +99,7 @@ class AggregatorHandleTest {
assertThat(testAggregator.recordedLong.get()).isEqualTo(22);
assertThat(testAggregator.recordedDouble.get()).isEqualTo(0);
testAggregator.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true);
testAggregator.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true);
assertThat(testAggregator.recordedLong.get()).isEqualTo(0);
assertThat(testAggregator.recordedDouble.get()).isEqualTo(0);
@ -106,7 +107,7 @@ class AggregatorHandleTest {
assertThat(testAggregator.recordedLong.get()).isEqualTo(0);
assertThat(testAggregator.recordedDouble.get()).isEqualTo(33.55);
testAggregator.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true);
testAggregator.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true);
assertThat(testAggregator.recordedLong.get()).isEqualTo(0);
assertThat(testAggregator.recordedDouble.get()).isEqualTo(0);
}
@ -148,12 +149,12 @@ class AggregatorHandleTest {
testAggregator.recordDouble(1.0, Attributes.empty(), Context.root());
Mockito.when(doubleReservoir.collectAndReset(attributes))
.thenReturn(Collections.singletonList(result));
testAggregator.accumulateThenMaybeReset(attributes, /* reset= */ true);
testAggregator.aggregateThenMaybeReset(0, 1, attributes, /* reset= */ true);
assertThat(testAggregator.recordedExemplars.get()).containsExactly(result);
}
private static class TestAggregatorHandle<T extends ExemplarData>
extends AggregatorHandle<Void, T> {
extends AggregatorHandle<PointData, T> {
final AtomicLong recordedLong = new AtomicLong();
final AtomicDouble recordedDouble = new AtomicDouble();
final AtomicReference<List<T>> recordedExemplars = new AtomicReference<>();
@ -164,7 +165,12 @@ class AggregatorHandleTest {
@Nullable
@Override
protected Void doAccumulateThenMaybeReset(List<T> exemplars, boolean reset) {
protected PointData doAggregateThenMaybeReset(
long startEpochNanos,
long epochNanos,
Attributes attributes,
List<T> exemplars,
boolean reset) {
recordedLong.set(0);
recordedDouble.set(0);
recordedExemplars.set(exemplars);

View File

@ -16,18 +16,22 @@ import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.data.HistogramPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.MetricDataType;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoubleExemplarData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramPointData;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.resources.Resource;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
import java.util.stream.DoubleStream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
@ -40,6 +44,8 @@ class DoubleExplicitBucketHistogramAggregatorTest {
@Mock ExemplarReservoir<DoubleExemplarData> reservoir;
private static final double[] boundaries = new double[] {10.0, 100.0, 1000.0};
private static final List<Double> boundariesList =
DoubleStream.of(boundaries).boxed().collect(Collectors.toList());
private static final Resource RESOURCE = Resource.getDefault();
private static final InstrumentationScopeInfo INSTRUMENTATION_SCOPE_INFO =
InstrumentationScopeInfo.empty();
@ -56,20 +62,28 @@ class DoubleExplicitBucketHistogramAggregatorTest {
@Test
void testRecordings() {
AggregatorHandle<ExplicitBucketHistogramAccumulation, DoubleExemplarData> aggregatorHandle =
AggregatorHandle<HistogramPointData, DoubleExemplarData> aggregatorHandle =
aggregator.createHandle();
aggregatorHandle.recordLong(20);
aggregatorHandle.recordLong(5);
aggregatorHandle.recordLong(150);
aggregatorHandle.recordLong(2000);
assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true))
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true))
.isEqualTo(
ExplicitBucketHistogramAccumulation.create(
2175, /* hasMinMax= */ true, 5d, 2000d, new long[] {1, 1, 1, 1}));
ImmutableHistogramPointData.create(
0,
1,
Attributes.empty(),
2175,
5d,
2000d,
boundariesList,
Arrays.asList(1L, 1L, 1L, 1L)));
}
@Test
void testExemplarsInAccumulation() {
void aggregateThenMaybeReset_WithExemplars() {
Attributes attributes = Attributes.builder().put("test", "value").build();
DoubleExemplarData exemplar =
ImmutableDoubleExemplarData.create(
@ -85,54 +99,70 @@ class DoubleExplicitBucketHistogramAggregatorTest {
Mockito.when(reservoir.collectAndReset(Attributes.empty())).thenReturn(exemplars);
DoubleExplicitBucketHistogramAggregator aggregator =
new DoubleExplicitBucketHistogramAggregator(boundaries, () -> reservoir);
AggregatorHandle<ExplicitBucketHistogramAccumulation, DoubleExemplarData> aggregatorHandle =
AggregatorHandle<HistogramPointData, DoubleExemplarData> aggregatorHandle =
aggregator.createHandle();
aggregatorHandle.recordDouble(0, attributes, Context.root());
assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true))
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true))
.isEqualTo(
ExplicitBucketHistogramAccumulation.create(
0, /* hasMinMax= */ true, 0, 0, new long[] {1, 0, 0, 0}, exemplars));
ImmutableHistogramPointData.create(
0,
1,
Attributes.empty(),
0,
0.0,
0.0,
boundariesList,
Arrays.asList(1L, 0L, 0L, 0L),
exemplars));
}
@Test
void toAccumulationAndReset() {
AggregatorHandle<ExplicitBucketHistogramAccumulation, DoubleExemplarData> aggregatorHandle =
void aggregateThenMaybeReset() {
AggregatorHandle<HistogramPointData, DoubleExemplarData> aggregatorHandle =
aggregator.createHandle();
assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true))
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true))
.isNull();
aggregatorHandle.recordLong(100);
assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true))
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true))
.isEqualTo(
ExplicitBucketHistogramAccumulation.create(
100, /* hasMinMax= */ true, 100d, 100d, new long[] {0, 1, 0, 0}));
assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true))
ImmutableHistogramPointData.create(
0,
1,
Attributes.empty(),
100,
100d,
100d,
boundariesList,
Arrays.asList(0L, 1L, 0L, 0L)));
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true))
.isNull();
aggregatorHandle.recordLong(0);
assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true))
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true))
.isEqualTo(
ExplicitBucketHistogramAccumulation.create(
0, /* hasMinMax= */ true, 0d, 0d, new long[] {1, 0, 0, 0}));
assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true))
ImmutableHistogramPointData.create(
0,
1,
Attributes.empty(),
0,
0d,
0d,
boundariesList,
Arrays.asList(1L, 0L, 0L, 0L)));
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true))
.isNull();
}
@Test
void accumulateData() {
assertThat(aggregator.accumulateDoubleMeasurement(11.1, Attributes.empty(), Context.current()))
.isEqualTo(
ExplicitBucketHistogramAccumulation.create(
11.1, /* hasMinMax= */ true, 11.1, 11.1, new long[] {0, 1, 0, 0}));
assertThat(aggregator.accumulateLongMeasurement(10, Attributes.empty(), Context.current()))
.isEqualTo(
ExplicitBucketHistogramAccumulation.create(
10.0, /* hasMinMax= */ true, 10.0, 10.0, new long[] {1, 0, 0, 0}));
}
@Test
void toMetricData() {
AggregatorHandle<ExplicitBucketHistogramAccumulation, DoubleExemplarData> aggregatorHandle =
AggregatorHandle<HistogramPointData, DoubleExemplarData> aggregatorHandle =
aggregator.createHandle();
aggregatorHandle.recordLong(10);
@ -141,13 +171,10 @@ class DoubleExplicitBucketHistogramAggregatorTest {
RESOURCE,
INSTRUMENTATION_SCOPE_INFO,
METRIC_DESCRIPTOR,
Collections.singletonMap(
Attributes.empty(),
aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)),
AggregationTemporality.DELTA,
0,
10,
100);
Collections.singletonList(
aggregatorHandle.aggregateThenMaybeReset(
0, 1, Attributes.empty(), /* reset= */ true)),
AggregationTemporality.DELTA);
assertThat(metricData).isNotNull();
assertThat(metricData.getType()).isEqualTo(MetricDataType.HISTOGRAM);
assertThat(metricData.getHistogramData().getAggregationTemporality())
@ -167,24 +194,24 @@ class DoubleExplicitBucketHistogramAggregatorTest {
TraceFlags.getDefault(),
TraceState.getDefault()),
1);
ExplicitBucketHistogramAccumulation accumulation =
ExplicitBucketHistogramAccumulation.create(
HistogramPointData histPoint =
ImmutableHistogramPointData.create(
0,
1,
Attributes.empty(),
2,
/* hasMinMax= */ true,
2d,
2d,
new long[] {1, 0, 0, 0},
boundariesList,
Arrays.asList(1L, 0L, 0L, 0L),
Collections.singletonList(exemplar));
assertThat(
aggregator.toMetricData(
RESOURCE,
INSTRUMENTATION_SCOPE_INFO,
METRIC_DESCRIPTOR,
Collections.singletonMap(Attributes.empty(), accumulation),
AggregationTemporality.CUMULATIVE,
0,
10,
100))
Collections.singletonList(histPoint),
AggregationTemporality.CUMULATIVE))
.hasHistogramSatisfying(
histogram ->
histogram.hasPointsSatisfying(
@ -200,32 +227,18 @@ class DoubleExplicitBucketHistogramAggregatorTest {
@Test
void testHistogramCounts() {
assertThat(
aggregator
.accumulateDoubleMeasurement(1.1, Attributes.empty(), Context.root())
.getCounts()
.length)
.isEqualTo(boundaries.length + 1);
assertThat(
aggregator
.accumulateLongMeasurement(1, Attributes.empty(), Context.root())
.getCounts()
.length)
.isEqualTo(boundaries.length + 1);
AggregatorHandle<ExplicitBucketHistogramAccumulation, DoubleExemplarData> aggregatorHandle =
AggregatorHandle<HistogramPointData, DoubleExemplarData> aggregatorHandle =
aggregator.createHandle();
aggregatorHandle.recordDouble(1.1);
ExplicitBucketHistogramAccumulation explicitBucketHistogramAccumulation =
aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true);
assertThat(explicitBucketHistogramAccumulation).isNotNull();
assertThat(explicitBucketHistogramAccumulation.getCounts().length)
.isEqualTo(boundaries.length + 1);
HistogramPointData point =
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true);
assertThat(point).isNotNull();
assertThat(point.getCounts().size()).isEqualTo(boundaries.length + 1);
}
@Test
void testMultithreadedUpdates() throws InterruptedException {
AggregatorHandle<ExplicitBucketHistogramAccumulation, DoubleExemplarData> aggregatorHandle =
AggregatorHandle<HistogramPointData, DoubleExemplarData> aggregatorHandle =
aggregator.createHandle();
ImmutableList<Long> updates = ImmutableList.of(1L, 2L, 3L, 5L, 7L, 11L, 13L, 17L, 19L, 23L);
int numberOfThreads = updates.size();
@ -242,16 +255,24 @@ class DoubleExplicitBucketHistogramAggregatorTest {
for (int j = 0; j < numberOfUpdates; j++) {
aggregatorHandle.recordLong(v);
if (ThreadLocalRandom.current().nextInt(10) == 0) {
aggregatorHandle.accumulateThenMaybeReset(
Attributes.empty(), /* reset= */ false);
aggregatorHandle.aggregateThenMaybeReset(
0, 1, Attributes.empty(), /* reset= */ false);
}
}
}))
.collect(Collectors.toList()));
assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ false))
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ false))
.isEqualTo(
ExplicitBucketHistogramAccumulation.create(
1010000, /* hasMinMax= */ true, 1d, 23d, new long[] {50000, 50000, 0, 0}));
ImmutableHistogramPointData.create(
0,
1,
Attributes.empty(),
1010000,
1d,
23d,
boundariesList,
Arrays.asList(50000L, 50000L, 0L, 0L)));
}
}

View File

@ -17,6 +17,7 @@ import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.data.ExponentialHistogramBuckets;
import io.opentelemetry.sdk.metrics.data.ExponentialHistogramPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.MetricDataType;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoubleExemplarData;
@ -68,34 +69,25 @@ class DoubleExponentialHistogramAggregatorTest {
return (int) Math.ceil(Math.log(value) * scaleFactor) - 1;
}
private static ExponentialHistogramAccumulation getTestAccumulation(
List<DoubleExemplarData> exemplars, double... recordings) {
AggregatorHandle<ExponentialHistogramAccumulation, DoubleExemplarData> aggregatorHandle =
aggregator.createHandle();
for (double r : recordings) {
aggregatorHandle.recordDouble(r);
}
return aggregatorHandle.doAccumulateThenMaybeReset(exemplars, /* reset= */ true);
}
@Test
void createHandle() {
AggregatorHandle<?, ?> handle = aggregator.createHandle();
assertThat(handle).isInstanceOf(DoubleExponentialHistogramAggregator.Handle.class);
ExponentialHistogramAccumulation accumulation =
ExponentialHistogramPointData point =
((DoubleExponentialHistogramAggregator.Handle) handle)
.doAccumulateThenMaybeReset(Collections.emptyList(), /* reset= */ true);
assertThat(accumulation.getPositiveBuckets())
.doAggregateThenMaybeReset(
0, 1, Attributes.empty(), Collections.emptyList(), /* reset= */ true);
assertThat(point.getPositiveBuckets())
.isInstanceOf(DoubleExponentialHistogramAggregator.EmptyExponentialHistogramBuckets.class);
assertThat(accumulation.getPositiveBuckets().getScale()).isEqualTo(MAX_SCALE);
assertThat(accumulation.getNegativeBuckets())
assertThat(point.getPositiveBuckets().getScale()).isEqualTo(MAX_SCALE);
assertThat(point.getNegativeBuckets())
.isInstanceOf(DoubleExponentialHistogramAggregator.EmptyExponentialHistogramBuckets.class);
assertThat(accumulation.getNegativeBuckets().getScale()).isEqualTo(MAX_SCALE);
assertThat(point.getNegativeBuckets().getScale()).isEqualTo(MAX_SCALE);
}
@Test
void testRecordings() {
AggregatorHandle<ExponentialHistogramAccumulation, DoubleExemplarData> aggregatorHandle =
AggregatorHandle<ExponentialHistogramPointData, DoubleExemplarData> aggregatorHandle =
aggregator.createHandle();
aggregatorHandle.recordDouble(0.5);
aggregatorHandle.recordDouble(1.0);
@ -108,28 +100,29 @@ class DoubleExponentialHistogramAggregatorTest {
aggregatorHandle.recordDouble(0.0);
aggregatorHandle.recordLong(0);
ExponentialHistogramAccumulation acc =
aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true);
List<Long> positiveCounts = Objects.requireNonNull(acc).getPositiveBuckets().getBucketCounts();
List<Long> negativeCounts = acc.getNegativeBuckets().getBucketCounts();
ExponentialHistogramPointData point =
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true);
List<Long> positiveCounts =
Objects.requireNonNull(point).getPositiveBuckets().getBucketCounts();
List<Long> negativeCounts = point.getNegativeBuckets().getBucketCounts();
int expectedScale = 5; // should be downscaled from 20 to 5 after recordings
assertThat(acc.getScale()).isEqualTo(expectedScale);
assertThat(acc.getPositiveBuckets().getScale()).isEqualTo(expectedScale);
assertThat(acc.getNegativeBuckets().getScale()).isEqualTo(expectedScale);
assertThat(acc.getZeroCount()).isEqualTo(2);
assertThat(point.getScale()).isEqualTo(expectedScale);
assertThat(point.getPositiveBuckets().getScale()).isEqualTo(expectedScale);
assertThat(point.getNegativeBuckets().getScale()).isEqualTo(expectedScale);
assertThat(point.getZeroCount()).isEqualTo(2);
// Assert positive recordings are at correct index
int posOffset = acc.getPositiveBuckets().getOffset();
assertThat(acc.getPositiveBuckets().getTotalCount()).isEqualTo(5);
int posOffset = point.getPositiveBuckets().getOffset();
assertThat(point.getPositiveBuckets().getTotalCount()).isEqualTo(5);
assertThat(positiveCounts.get(valueToIndex(expectedScale, 0.5) - posOffset)).isEqualTo(1);
assertThat(positiveCounts.get(valueToIndex(expectedScale, 1.0) - posOffset)).isEqualTo(1);
assertThat(positiveCounts.get(valueToIndex(expectedScale, 12.0) - posOffset)).isEqualTo(2);
assertThat(positiveCounts.get(valueToIndex(expectedScale, 15.213) - posOffset)).isEqualTo(1);
// Assert negative recordings are at correct index
int negOffset = acc.getNegativeBuckets().getOffset();
assertThat(acc.getNegativeBuckets().getTotalCount()).isEqualTo(3);
int negOffset = point.getNegativeBuckets().getOffset();
assertThat(point.getNegativeBuckets().getTotalCount()).isEqualTo(3);
assertThat(negativeCounts.get(valueToIndex(expectedScale, 13.2) - negOffset)).isEqualTo(1);
assertThat(negativeCounts.get(valueToIndex(expectedScale, 2.01) - negOffset)).isEqualTo(1);
assertThat(negativeCounts.get(valueToIndex(expectedScale, 1.0) - negOffset)).isEqualTo(1);
@ -137,45 +130,45 @@ class DoubleExponentialHistogramAggregatorTest {
@Test
void testInvalidRecording() {
AggregatorHandle<ExponentialHistogramAccumulation, DoubleExemplarData> aggregatorHandle =
AggregatorHandle<ExponentialHistogramPointData, DoubleExemplarData> aggregatorHandle =
aggregator.createHandle();
// Non finite recordings should be ignored
aggregatorHandle.recordDouble(Double.POSITIVE_INFINITY);
aggregatorHandle.recordDouble(Double.NEGATIVE_INFINITY);
aggregatorHandle.recordDouble(Double.NaN);
ExponentialHistogramAccumulation acc =
aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true);
assertThat(Objects.requireNonNull(acc).getSum()).isEqualTo(0);
assertThat(acc.getPositiveBuckets().getTotalCount()).isEqualTo(0);
assertThat(acc.getNegativeBuckets().getTotalCount()).isEqualTo(0);
assertThat(acc.getZeroCount()).isEqualTo(0);
ExponentialHistogramPointData point =
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true);
assertThat(Objects.requireNonNull(point).getSum()).isEqualTo(0);
assertThat(point.getPositiveBuckets().getTotalCount()).isEqualTo(0);
assertThat(point.getNegativeBuckets().getTotalCount()).isEqualTo(0);
assertThat(point.getZeroCount()).isEqualTo(0);
}
@ParameterizedTest
@MethodSource("provideAggregator")
void testRecordingsAtLimits(DoubleExponentialHistogramAggregator aggregator) {
AggregatorHandle<ExponentialHistogramAccumulation, DoubleExemplarData> aggregatorHandle =
AggregatorHandle<ExponentialHistogramPointData, DoubleExemplarData> aggregatorHandle =
aggregator.createHandle();
aggregatorHandle.recordDouble(Double.MIN_VALUE);
aggregatorHandle.recordDouble(Double.MAX_VALUE);
ExponentialHistogramAccumulation acc =
aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true);
List<Long> bucketCounts = Objects.requireNonNull(acc).getPositiveBuckets().getBucketCounts();
ExponentialHistogramPointData point =
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true);
List<Long> bucketCounts = Objects.requireNonNull(point).getPositiveBuckets().getBucketCounts();
// assert buckets == [1 0 0 0 ... 1]
assertThat(bucketCounts.get(0)).isEqualTo(1);
assertThat(bucketCounts.get(bucketCounts.size() - 1)).isEqualTo(1);
assertThat(bucketCounts.stream().filter(i -> i == 0).count())
.isEqualTo(bucketCounts.size() - 2);
assertThat(acc.getPositiveBuckets().getTotalCount()).isEqualTo(2);
assertThat(point.getPositiveBuckets().getTotalCount()).isEqualTo(2);
// With 160 buckets allowed, minimum scale is -4
assertThat(acc.getScale()).isEqualTo(-4);
assertThat(acc.getPositiveBuckets().getScale()).isEqualTo(-4);
assertThat(acc.getNegativeBuckets().getScale()).isEqualTo(-4);
assertThat(point.getScale()).isEqualTo(-4);
assertThat(point.getPositiveBuckets().getScale()).isEqualTo(-4);
assertThat(point.getNegativeBuckets().getScale()).isEqualTo(-4);
// if scale is -4, base is 65,536.
int base = 65_536;
@ -185,20 +178,20 @@ class DoubleExponentialHistogramAggregatorTest {
// lowest bucket
// As the bucket lower bound is less than Double.MIN_VALUE, Math.pow() rounds to 0
assertThat(Math.pow(base, acc.getPositiveBuckets().getOffset())).isEqualTo(0);
assertThat(Math.pow(base, acc.getPositiveBuckets().getOffset() + 1))
assertThat(Math.pow(base, point.getPositiveBuckets().getOffset())).isEqualTo(0);
assertThat(Math.pow(base, point.getPositiveBuckets().getOffset() + 1))
.isGreaterThan(Double.MIN_VALUE);
// highest bucket
assertThat(Math.pow(base, acc.getPositiveBuckets().getOffset() + bucketCounts.size() - 1))
assertThat(Math.pow(base, point.getPositiveBuckets().getOffset() + bucketCounts.size() - 1))
.isLessThanOrEqualTo(Double.MAX_VALUE);
// As the bucket upper bound is greater than Double.MAX_VALUE, Math.pow() rounds to infinity
assertThat(Math.pow(base, acc.getPositiveBuckets().getOffset() + bucketCounts.size()))
assertThat(Math.pow(base, point.getPositiveBuckets().getOffset() + bucketCounts.size()))
.isEqualTo(Double.POSITIVE_INFINITY);
}
@Test
void testExemplarsInAccumulation() {
void aggregateThenMaybeReset_WithExemplars() {
DoubleExponentialHistogramAggregator agg =
new DoubleExponentialHistogramAggregator(() -> reservoir, 160, MAX_SCALE);
@ -216,48 +209,42 @@ class DoubleExponentialHistogramAggregatorTest {
List<DoubleExemplarData> exemplars = Collections.singletonList(exemplar);
Mockito.when(reservoir.collectAndReset(Attributes.empty())).thenReturn(exemplars);
AggregatorHandle<ExponentialHistogramAccumulation, DoubleExemplarData> aggregatorHandle =
AggregatorHandle<ExponentialHistogramPointData, DoubleExemplarData> aggregatorHandle =
agg.createHandle();
aggregatorHandle.recordDouble(0, attributes, Context.root());
assertThat(
Objects.requireNonNull(
aggregatorHandle.accumulateThenMaybeReset(
Attributes.empty(), /* reset= */ true))
aggregatorHandle.aggregateThenMaybeReset(
0, 1, Attributes.empty(), /* reset= */ true))
.getExemplars())
.isEqualTo(exemplars);
}
@Test
void testAccumulationAndReset() {
AggregatorHandle<ExponentialHistogramAccumulation, DoubleExemplarData> aggregatorHandle =
void aggregateThenMaybeReset() {
AggregatorHandle<ExponentialHistogramPointData, DoubleExemplarData> aggregatorHandle =
aggregator.createHandle();
assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true))
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true))
.isNull();
aggregatorHandle.recordDouble(5.0);
assertThat(
Objects.requireNonNull(
aggregatorHandle.accumulateThenMaybeReset(
Attributes.empty(), /* reset= */ true))
aggregatorHandle.aggregateThenMaybeReset(
0, 1, Attributes.empty(), /* reset= */ true))
.getPositiveBuckets()
.getBucketCounts())
.isEqualTo(Collections.singletonList(1L));
assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true))
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true))
.isNull();
}
@Test
void testAccumulateData() {
ExponentialHistogramAccumulation acc =
aggregator.accumulateDoubleMeasurement(1.2, Attributes.empty(), Context.current());
ExponentialHistogramAccumulation expected = getTestAccumulation(Collections.emptyList(), 1.2);
assertThat(acc).isEqualTo(expected);
}
@Test
void testInsert1M() {
AggregatorHandle<ExponentialHistogramAccumulation, DoubleExemplarData> handle =
AggregatorHandle<ExponentialHistogramPointData, DoubleExemplarData> handle =
aggregator.createHandle();
int n = 1024 * 1024 - 1;
@ -268,14 +255,14 @@ class DoubleExponentialHistogramAggregatorTest {
d += min;
}
ExponentialHistogramAccumulation acc =
ExponentialHistogramPointData point =
Objects.requireNonNull(
handle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true));
assertThat(acc.getScale()).isEqualTo(3);
assertThat(acc.getPositiveBuckets().getScale()).isEqualTo(3);
assertThat(acc.getNegativeBuckets().getScale()).isEqualTo(3);
assertThat(acc.getPositiveBuckets().getBucketCounts().size()).isEqualTo(160);
assertThat(acc.getPositiveBuckets().getTotalCount()).isEqualTo(n);
handle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true));
assertThat(point.getScale()).isEqualTo(3);
assertThat(point.getPositiveBuckets().getScale()).isEqualTo(3);
assertThat(point.getNegativeBuckets().getScale()).isEqualTo(3);
assertThat(point.getPositiveBuckets().getBucketCounts().size()).isEqualTo(160);
assertThat(point.getPositiveBuckets().getTotalCount()).isEqualTo(n);
}
@Test
@ -293,14 +280,14 @@ class DoubleExponentialHistogramAggregatorTest {
handle.recordDouble(4.0);
handle.recordDouble(16.0);
ExponentialHistogramAccumulation acc =
ExponentialHistogramPointData point =
Objects.requireNonNull(
handle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true));
assertThat(acc.getScale()).isEqualTo(0);
assertThat(acc.getPositiveBuckets().getScale()).isEqualTo(0);
assertThat(acc.getNegativeBuckets().getScale()).isEqualTo(0);
ExponentialHistogramBuckets buckets = acc.getPositiveBuckets();
assertThat(acc.getSum()).isEqualTo(23.5);
handle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true));
assertThat(point.getScale()).isEqualTo(0);
assertThat(point.getPositiveBuckets().getScale()).isEqualTo(0);
assertThat(point.getNegativeBuckets().getScale()).isEqualTo(0);
ExponentialHistogramBuckets buckets = point.getPositiveBuckets();
assertThat(point.getSum()).isEqualTo(23.5);
assertThat(buckets.getOffset()).isEqualTo(-2);
assertThat(buckets.getBucketCounts()).isEqualTo(Arrays.asList(1L, 1L, 1L, 1L, 0L, 1L));
assertThat(buckets.getTotalCount()).isEqualTo(5);
@ -329,24 +316,21 @@ class DoubleExponentialHistogramAggregatorTest {
DoubleExponentialHistogramAggregator cumulativeAggregator =
new DoubleExponentialHistogramAggregator(reservoirSupplier, 160, MAX_SCALE);
AggregatorHandle<ExponentialHistogramAccumulation, DoubleExemplarData> aggregatorHandle =
AggregatorHandle<ExponentialHistogramPointData, DoubleExemplarData> aggregatorHandle =
cumulativeAggregator.createHandle();
aggregatorHandle.recordDouble(0);
aggregatorHandle.recordDouble(0);
aggregatorHandle.recordDouble(123.456);
ExponentialHistogramAccumulation acc =
aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true);
ExponentialHistogramPointData expPoint =
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true);
MetricData metricDataCumulative =
cumulativeAggregator.toMetricData(
RESOURCE,
INSTRUMENTATION_SCOPE_INFO,
METRIC_DESCRIPTOR,
Collections.singletonMap(Attributes.empty(), acc),
AggregationTemporality.CUMULATIVE,
0,
10,
100);
Collections.singletonList(expPoint),
AggregationTemporality.CUMULATIVE);
// Assertions run twice to verify immutability; recordings shouldn't modify the metric data
for (int i = 0; i < 2; i++) {
@ -386,11 +370,8 @@ class DoubleExponentialHistogramAggregatorTest {
RESOURCE,
INSTRUMENTATION_SCOPE_INFO,
METRIC_DESCRIPTOR,
Collections.singletonMap(Attributes.empty(), acc),
AggregationTemporality.DELTA,
0,
10,
100);
Collections.singletonList(expPoint),
AggregationTemporality.DELTA);
assertThat(metricDataDelta.getType()).isEqualTo(MetricDataType.EXPONENTIAL_HISTOGRAM);
assertThat(metricDataDelta.getExponentialHistogramData().getAggregationTemporality())
.isEqualTo(AggregationTemporality.DELTA);
@ -398,7 +379,7 @@ class DoubleExponentialHistogramAggregatorTest {
@Test
void testMultithreadedUpdates() throws InterruptedException {
AggregatorHandle<ExponentialHistogramAccumulation, DoubleExemplarData> aggregatorHandle =
AggregatorHandle<ExponentialHistogramPointData, DoubleExemplarData> aggregatorHandle =
aggregator.createHandle();
ImmutableList<Double> updates = ImmutableList.of(0D, 0.1D, -0.1D, 1D, -1D, 100D);
int numberOfThreads = updates.size();
@ -415,47 +396,52 @@ class DoubleExponentialHistogramAggregatorTest {
for (int j = 0; j < numberOfUpdates; j++) {
aggregatorHandle.recordDouble(v);
if (ThreadLocalRandom.current().nextInt(10) == 0) {
aggregatorHandle.accumulateThenMaybeReset(
Attributes.empty(), /* reset= */ false);
aggregatorHandle.aggregateThenMaybeReset(
0, 1, Attributes.empty(), /* reset= */ false);
}
}
}))
.collect(Collectors.toList()));
ExponentialHistogramAccumulation acc =
ExponentialHistogramPointData point =
Objects.requireNonNull(
aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ false));
assertThat(acc.getZeroCount()).isEqualTo(numberOfUpdates);
assertThat(acc.getSum()).isCloseTo(100.0D * 10000, Offset.offset(0.0001)); // float error
assertThat(acc.getScale()).isEqualTo(3);
assertThat(acc.getPositiveBuckets().getScale()).isEqualTo(3);
assertThat(acc.getNegativeBuckets().getScale()).isEqualTo(3);
ExponentialHistogramBuckets positiveBuckets = acc.getPositiveBuckets();
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ false));
assertThat(point.getZeroCount()).isEqualTo(numberOfUpdates);
assertThat(point.getSum()).isCloseTo(100.0D * 10000, Offset.offset(0.0001)); // float error
assertThat(point.getScale()).isEqualTo(3);
assertThat(point.getPositiveBuckets().getScale()).isEqualTo(3);
assertThat(point.getNegativeBuckets().getScale()).isEqualTo(3);
ExponentialHistogramBuckets positiveBuckets = point.getPositiveBuckets();
assertThat(positiveBuckets.getTotalCount()).isEqualTo(numberOfUpdates * 3);
assertThat(positiveBuckets.getOffset()).isEqualTo(-27);
ExponentialHistogramBuckets negativeBuckets = acc.getNegativeBuckets();
ExponentialHistogramBuckets negativeBuckets = point.getNegativeBuckets();
assertThat(negativeBuckets.getTotalCount()).isEqualTo(numberOfUpdates * 2);
assertThat(negativeBuckets.getOffset()).isEqualTo(-27);
// Verify positive buckets have correct counts
List<Long> posCounts = acc.getPositiveBuckets().getBucketCounts();
List<Long> posCounts = point.getPositiveBuckets().getBucketCounts();
assertThat(
posCounts.get(valueToIndex(acc.getScale(), 0.1) - acc.getPositiveBuckets().getOffset()))
posCounts.get(
valueToIndex(point.getScale(), 0.1) - point.getPositiveBuckets().getOffset()))
.isEqualTo(numberOfUpdates);
assertThat(
posCounts.get(valueToIndex(acc.getScale(), 1) - acc.getPositiveBuckets().getOffset()))
posCounts.get(
valueToIndex(point.getScale(), 1) - point.getPositiveBuckets().getOffset()))
.isEqualTo(numberOfUpdates);
assertThat(
posCounts.get(valueToIndex(acc.getScale(), 100) - acc.getPositiveBuckets().getOffset()))
posCounts.get(
valueToIndex(point.getScale(), 100) - point.getPositiveBuckets().getOffset()))
.isEqualTo(numberOfUpdates);
// Verify negative buckets have correct counts
List<Long> negCounts = acc.getNegativeBuckets().getBucketCounts();
List<Long> negCounts = point.getNegativeBuckets().getBucketCounts();
assertThat(
negCounts.get(valueToIndex(acc.getScale(), 0.1) - acc.getPositiveBuckets().getOffset()))
negCounts.get(
valueToIndex(point.getScale(), 0.1) - point.getPositiveBuckets().getOffset()))
.isEqualTo(numberOfUpdates);
assertThat(
negCounts.get(valueToIndex(acc.getScale(), 1) - acc.getPositiveBuckets().getOffset()))
negCounts.get(
valueToIndex(point.getScale(), 1) - point.getPositiveBuckets().getOffset()))
.isEqualTo(numberOfUpdates);
}
}

View File

@ -14,8 +14,10 @@ import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.data.DoublePointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoubleExemplarData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.resources.Resource;
@ -40,51 +42,54 @@ class DoubleLastValueAggregatorTest {
@Test
void multipleRecords() {
AggregatorHandle<DoubleAccumulation, DoubleExemplarData> aggregatorHandle =
AggregatorHandle<DoublePointData, DoubleExemplarData> aggregatorHandle =
aggregator.createHandle();
aggregatorHandle.recordDouble(12.1);
assertThat(
aggregatorHandle
.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)
.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true)
.getValue())
.isEqualTo(12.1);
aggregatorHandle.recordDouble(13.1);
aggregatorHandle.recordDouble(14.1);
assertThat(
aggregatorHandle
.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)
.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true)
.getValue())
.isEqualTo(14.1);
}
@Test
void toAccumulationAndReset() {
AggregatorHandle<DoubleAccumulation, DoubleExemplarData> aggregatorHandle =
void aggregateThenMaybeReset() {
AggregatorHandle<DoublePointData, DoubleExemplarData> aggregatorHandle =
aggregator.createHandle();
assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true))
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true))
.isNull();
aggregatorHandle.recordDouble(13.1);
assertThat(
aggregatorHandle
.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)
.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true)
.getValue())
.isEqualTo(13.1);
assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true))
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true))
.isNull();
aggregatorHandle.recordDouble(12.1);
assertThat(
aggregatorHandle
.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)
.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true)
.getValue())
.isEqualTo(12.1);
assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true))
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true))
.isNull();
}
@Test
void diffAccumulation() {
void diff() {
Attributes attributes = Attributes.builder().put("test", "value").build();
DoubleExemplarData exemplar =
ImmutableDoubleExemplarData.create(
@ -108,17 +113,18 @@ class DoubleLastValueAggregatorTest {
TraceFlags.getDefault(),
TraceState.getDefault()),
2));
DoubleAccumulation result =
DoublePointData result =
aggregator.diff(
DoubleAccumulation.create(1, previousExemplars),
DoubleAccumulation.create(2, exemplars));
ImmutableDoublePointData.create(0, 1, Attributes.empty(), 1, previousExemplars),
ImmutableDoublePointData.create(0, 1, Attributes.empty(), 2, exemplars));
// Assert that latest measurement is kept.
assertThat(result).isEqualTo(DoubleAccumulation.create(2, exemplars));
assertThat(result)
.isEqualTo(ImmutableDoublePointData.create(0, 1, Attributes.empty(), 2, exemplars));
}
@Test
void toMetricData() {
AggregatorHandle<DoubleAccumulation, DoubleExemplarData> aggregatorHandle =
AggregatorHandle<DoublePointData, DoubleExemplarData> aggregatorHandle =
aggregator.createHandle();
aggregatorHandle.recordDouble(10);
@ -127,13 +133,10 @@ class DoubleLastValueAggregatorTest {
RESOURCE,
INSTRUMENTATION_SCOPE_INFO,
METRIC_DESCRIPTOR,
Collections.singletonMap(
Attributes.empty(),
aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)),
AggregationTemporality.DELTA,
0,
10,
100);
Collections.singletonList(
aggregatorHandle.aggregateThenMaybeReset(
10, 100, Attributes.empty(), /* reset= */ true)),
AggregationTemporality.DELTA);
assertThat(metricData)
.hasName("name")
.hasDescription("description")

View File

@ -17,8 +17,10 @@ import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.InstrumentValueType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.data.DoublePointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoubleExemplarData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
@ -59,7 +61,7 @@ class DoubleSumAggregatorTest {
@Test
void multipleRecords() {
AggregatorHandle<DoubleAccumulation, DoubleExemplarData> aggregatorHandle =
AggregatorHandle<DoublePointData, DoubleExemplarData> aggregatorHandle =
aggregator.createHandle();
aggregatorHandle.recordDouble(12.1);
aggregatorHandle.recordDouble(12.1);
@ -68,14 +70,14 @@ class DoubleSumAggregatorTest {
aggregatorHandle.recordDouble(12.1);
assertThat(
aggregatorHandle
.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)
.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true)
.getValue())
.isEqualTo(12.1 * 5);
}
@Test
void multipleRecords_WithNegatives() {
AggregatorHandle<DoubleAccumulation, DoubleExemplarData> aggregatorHandle =
AggregatorHandle<DoublePointData, DoubleExemplarData> aggregatorHandle =
aggregator.createHandle();
aggregatorHandle.recordDouble(12);
aggregatorHandle.recordDouble(12);
@ -85,41 +87,44 @@ class DoubleSumAggregatorTest {
aggregatorHandle.recordDouble(-11);
assertThat(
aggregatorHandle
.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)
.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true)
.getValue())
.isEqualTo(14);
}
@Test
void toAccumulationAndReset() {
AggregatorHandle<DoubleAccumulation, DoubleExemplarData> aggregatorHandle =
void aggregateThenMaybeReset() {
AggregatorHandle<DoublePointData, DoubleExemplarData> aggregatorHandle =
aggregator.createHandle();
assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true))
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true))
.isNull();
aggregatorHandle.recordDouble(13);
aggregatorHandle.recordDouble(12);
assertThat(
aggregatorHandle
.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)
.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true)
.getValue())
.isEqualTo(25);
assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true))
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true))
.isNull();
aggregatorHandle.recordDouble(12);
aggregatorHandle.recordDouble(-25);
assertThat(
aggregatorHandle
.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)
.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true)
.getValue())
.isEqualTo(-13);
assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true))
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true))
.isNull();
}
@Test
void testExemplarsInAccumulation() {
void aggregateThenMaybeReset_WithExemplars() {
Attributes attributes = Attributes.builder().put("test", "value").build();
DoubleExemplarData exemplar =
ImmutableDoubleExemplarData.create(
@ -142,11 +147,12 @@ class DoubleSumAggregatorTest {
InstrumentType.COUNTER,
InstrumentValueType.DOUBLE),
() -> reservoir);
AggregatorHandle<DoubleAccumulation, DoubleExemplarData> aggregatorHandle =
AggregatorHandle<DoublePointData, DoubleExemplarData> aggregatorHandle =
aggregator.createHandle();
aggregatorHandle.recordDouble(0, attributes, Context.root());
assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true))
.isEqualTo(DoubleAccumulation.create(0, exemplars));
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true))
.isEqualTo(ImmutableDoublePointData.create(0, 1, Attributes.empty(), 0, exemplars));
}
@Test
@ -171,9 +177,10 @@ class DoubleSumAggregatorTest {
"name", "description", "unit", instrumentType, InstrumentValueType.LONG),
ExemplarReservoir::doubleNoSamples);
DoubleAccumulation diffed =
DoublePointData diffed =
aggregator.diff(
DoubleAccumulation.create(1d), DoubleAccumulation.create(2d, exemplars));
ImmutableDoublePointData.create(0, 1, Attributes.empty(), 1d),
ImmutableDoublePointData.create(0, 1, Attributes.empty(), 2d, exemplars));
assertThat(diffed.getValue())
.withFailMessage(
"Invalid diff result for instrumentType %s, temporality %s: %s",
@ -186,7 +193,7 @@ class DoubleSumAggregatorTest {
@Test
void toMetricData() {
AggregatorHandle<DoubleAccumulation, DoubleExemplarData> aggregatorHandle =
AggregatorHandle<DoublePointData, DoubleExemplarData> aggregatorHandle =
aggregator.createHandle();
aggregatorHandle.recordDouble(10);
@ -195,13 +202,10 @@ class DoubleSumAggregatorTest {
resource,
scope,
metricDescriptor,
Collections.singletonMap(
Attributes.empty(),
aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)),
AggregationTemporality.CUMULATIVE,
0,
10,
100);
Collections.singletonList(
aggregatorHandle.aggregateThenMaybeReset(
0, 100, Attributes.empty(), /* reset= */ true)),
AggregationTemporality.CUMULATIVE);
assertThat(metricData)
.hasName("name")
.hasDescription("description")
@ -232,18 +236,15 @@ class DoubleSumAggregatorTest {
TraceFlags.getDefault(),
TraceState.getDefault()),
1);
DoubleAccumulation accumulation =
DoubleAccumulation.create(1, Collections.singletonList(exemplar));
assertThat(
aggregator.toMetricData(
resource,
scope,
metricDescriptor,
Collections.singletonMap(Attributes.empty(), accumulation),
AggregationTemporality.CUMULATIVE,
0,
10,
100))
Collections.singletonList(
ImmutableDoublePointData.create(
0, 1, Attributes.empty(), 1, Collections.singletonList(exemplar))),
AggregationTemporality.CUMULATIVE))
.hasDoubleSumSatisfying(
sum -> sum.hasPointsSatisfying(point -> point.hasValue(1).hasExemplars(exemplar)));
}

View File

@ -11,6 +11,7 @@ import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.LongExemplarData;
import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData;
@ -38,53 +39,53 @@ class LongLastValueAggregatorTest {
@Test
void multipleRecords() {
AggregatorHandle<LongAccumulation, LongExemplarData> aggregatorHandle =
aggregator.createHandle();
AggregatorHandle<LongPointData, LongExemplarData> aggregatorHandle = aggregator.createHandle();
aggregatorHandle.recordLong(12);
assertThat(
aggregatorHandle
.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)
.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true)
.getValue())
.isEqualTo(12L);
aggregatorHandle.recordLong(13);
aggregatorHandle.recordLong(14);
assertThat(
aggregatorHandle
.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)
.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true)
.getValue())
.isEqualTo(14L);
}
@Test
void toAccumulationAndReset() {
AggregatorHandle<LongAccumulation, LongExemplarData> aggregatorHandle =
aggregator.createHandle();
assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true))
void aggregateThenMaybeReset() {
AggregatorHandle<LongPointData, LongExemplarData> aggregatorHandle = aggregator.createHandle();
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true))
.isNull();
aggregatorHandle.recordLong(13);
assertThat(
aggregatorHandle
.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)
.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true)
.getValue())
.isEqualTo(13L);
assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true))
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true))
.isNull();
aggregatorHandle.recordLong(12);
assertThat(
aggregatorHandle
.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)
.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true)
.getValue())
.isEqualTo(12L);
assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true))
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true))
.isNull();
}
@Test
void toMetricData() {
AggregatorHandle<LongAccumulation, LongExemplarData> aggregatorHandle =
aggregator.createHandle();
AggregatorHandle<LongPointData, LongExemplarData> aggregatorHandle = aggregator.createHandle();
aggregatorHandle.recordLong(10);
MetricData metricData =
@ -92,13 +93,10 @@ class LongLastValueAggregatorTest {
RESOURCE,
INSTRUMENTATION_SCOPE_INFO,
METRIC_DESCRIPTOR,
Collections.singletonMap(
Attributes.empty(),
aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)),
AggregationTemporality.CUMULATIVE,
2,
10,
100);
Collections.singletonList(
aggregatorHandle.aggregateThenMaybeReset(
2, 100, Attributes.empty(), /* reset= */ true)),
AggregationTemporality.CUMULATIVE);
assertThat(metricData)
.isEqualTo(
ImmutableMetricData.createLongGauge(

View File

@ -17,8 +17,10 @@ import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.InstrumentValueType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.LongExemplarData;
import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongExemplarData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
@ -58,8 +60,7 @@ class LongSumAggregatorTest {
@Test
void multipleRecords() {
AggregatorHandle<LongAccumulation, LongExemplarData> aggregatorHandle =
aggregator.createHandle();
AggregatorHandle<LongPointData, LongExemplarData> aggregatorHandle = aggregator.createHandle();
aggregatorHandle.recordLong(12);
aggregatorHandle.recordLong(12);
aggregatorHandle.recordLong(12);
@ -67,17 +68,17 @@ class LongSumAggregatorTest {
aggregatorHandle.recordLong(12);
assertThat(
aggregatorHandle
.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)
.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true)
.getValue())
.isEqualTo(12 * 5);
assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true))
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true))
.isNull();
}
@Test
void multipleRecords_WithNegatives() {
AggregatorHandle<LongAccumulation, LongExemplarData> aggregatorHandle =
aggregator.createHandle();
AggregatorHandle<LongPointData, LongExemplarData> aggregatorHandle = aggregator.createHandle();
aggregatorHandle.recordLong(12);
aggregatorHandle.recordLong(12);
aggregatorHandle.recordLong(-23);
@ -86,43 +87,46 @@ class LongSumAggregatorTest {
aggregatorHandle.recordLong(-11);
assertThat(
aggregatorHandle
.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)
.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true)
.getValue())
.isEqualTo(14);
assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true))
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true))
.isNull();
}
@Test
void toAccumulationAndReset() {
AggregatorHandle<LongAccumulation, LongExemplarData> aggregatorHandle =
aggregator.createHandle();
assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true))
void aggregateThenMaybeReset() {
AggregatorHandle<LongPointData, LongExemplarData> aggregatorHandle = aggregator.createHandle();
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true))
.isNull();
aggregatorHandle.recordLong(13);
aggregatorHandle.recordLong(12);
assertThat(
aggregatorHandle
.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)
.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true)
.getValue())
.isEqualTo(25);
assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true))
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true))
.isNull();
aggregatorHandle.recordLong(12);
aggregatorHandle.recordLong(-25);
assertThat(
aggregatorHandle
.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)
.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true)
.getValue())
.isEqualTo(-13);
assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true))
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true))
.isNull();
}
@Test
void testExemplarsInAccumulation() {
void aggregateThenMaybeReset_WithExemplars() {
Attributes attributes = Attributes.builder().put("test", "value").build();
LongExemplarData exemplar =
ImmutableLongExemplarData.create(
@ -145,11 +149,11 @@ class LongSumAggregatorTest {
InstrumentType.COUNTER,
InstrumentValueType.LONG),
() -> reservoir);
AggregatorHandle<LongAccumulation, LongExemplarData> aggregatorHandle =
aggregator.createHandle();
AggregatorHandle<LongPointData, LongExemplarData> aggregatorHandle = aggregator.createHandle();
aggregatorHandle.recordLong(0, attributes, Context.root());
assertThat(aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true))
.isEqualTo(LongAccumulation.create(0, exemplars));
assertThat(
aggregatorHandle.aggregateThenMaybeReset(0, 1, Attributes.empty(), /* reset= */ true))
.isEqualTo(ImmutableLongPointData.create(0, 1, Attributes.empty(), 0, exemplars));
}
@Test
@ -173,8 +177,10 @@ class LongSumAggregatorTest {
"name", "description", "unit", instrumentType, InstrumentValueType.LONG),
ExemplarReservoir::longNoSamples);
LongAccumulation diffed =
aggregator.diff(LongAccumulation.create(1L), LongAccumulation.create(2L, exemplars));
LongPointData diffed =
aggregator.diff(
ImmutableLongPointData.create(0, 1, Attributes.empty(), 1L),
ImmutableLongPointData.create(0, 1, Attributes.empty(), 2L, exemplars));
assertThat(diffed.getValue())
.withFailMessage(
"Invalid diff result for instrumentType %s, temporality %s: %s",
@ -188,8 +194,7 @@ class LongSumAggregatorTest {
@Test
@SuppressWarnings("unchecked")
void toMetricData() {
AggregatorHandle<LongAccumulation, LongExemplarData> aggregatorHandle =
aggregator.createHandle();
AggregatorHandle<LongPointData, LongExemplarData> aggregatorHandle = aggregator.createHandle();
aggregatorHandle.recordLong(10);
MetricData metricData =
@ -197,13 +202,10 @@ class LongSumAggregatorTest {
resource,
library,
metricDescriptor,
Collections.singletonMap(
Attributes.empty(),
aggregatorHandle.accumulateThenMaybeReset(Attributes.empty(), /* reset= */ true)),
AggregationTemporality.CUMULATIVE,
0,
10,
100);
Collections.singletonList(
aggregatorHandle.aggregateThenMaybeReset(
0, 100, Attributes.empty(), /* reset= */ true)),
AggregationTemporality.CUMULATIVE);
assertThat(metricData)
.hasName("name")
.hasDescription("description")
@ -234,17 +236,16 @@ class LongSumAggregatorTest {
TraceFlags.getDefault(),
TraceState.getDefault()),
1);
LongAccumulation accumulation = LongAccumulation.create(1, Collections.singletonList(exemplar));
assertThat(
aggregator.toMetricData(
resource,
library,
metricDescriptor,
Collections.singletonMap(Attributes.empty(), accumulation),
AggregationTemporality.CUMULATIVE,
0,
10,
100))
Collections.singletonList(
ImmutableLongPointData.create(
0, 1, Attributes.empty(), 1, Collections.singletonList(exemplar))),
AggregationTemporality.CUMULATIVE))
.hasLongSumSatisfying(
sum -> sum.hasPointsSatisfying(point -> point.hasValue(1).hasExemplars(exemplar)));
}

View File

@ -5,6 +5,8 @@
package io.opentelemetry.sdk.metrics.internal.state;
import static io.opentelemetry.sdk.metrics.internal.state.Measurement.doubleMeasurement;
import static io.opentelemetry.sdk.metrics.internal.state.Measurement.longMeasurement;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.attributeEntry;
import static org.mockito.ArgumentMatchers.any;
@ -85,9 +87,12 @@ class AsynchronousMetricStorageTest {
@Test
void recordLong() {
longCounterStorage.recordLong(1, Attributes.builder().put("key", "a").build());
longCounterStorage.recordLong(2, Attributes.builder().put("key", "b").build());
longCounterStorage.recordLong(3, Attributes.builder().put("key", "c").build());
longCounterStorage.record(
longMeasurement(0, 1, 1, Attributes.builder().put("key", "a").build()));
longCounterStorage.record(
longMeasurement(0, 1, 2, Attributes.builder().put("key", "b").build()));
longCounterStorage.record(
longMeasurement(0, 1, 3, Attributes.builder().put("key", "c").build()));
assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime()))
.satisfies(
@ -107,9 +112,12 @@ class AsynchronousMetricStorageTest {
@Test
void recordDouble() {
doubleCounterStorage.recordDouble(1.1, Attributes.builder().put("key", "a").build());
doubleCounterStorage.recordDouble(2.2, Attributes.builder().put("key", "b").build());
doubleCounterStorage.recordDouble(3.3, Attributes.builder().put("key", "c").build());
doubleCounterStorage.record(
doubleMeasurement(0, 1, 1.1, Attributes.builder().put("key", "a").build()));
doubleCounterStorage.record(
doubleMeasurement(0, 1, 2.2, Attributes.builder().put("key", "b").build()));
doubleCounterStorage.record(
doubleMeasurement(0, 1, 3.3, Attributes.builder().put("key", "c").build()));
assertThat(doubleCounterStorage.collect(resource, scope, 0, testClock.nanoTime()))
.satisfies(
@ -142,7 +150,8 @@ class AsynchronousMetricStorageTest {
InstrumentDescriptor.create(
"name", "description", "unit", InstrumentType.COUNTER, InstrumentValueType.LONG));
storage.recordLong(1, Attributes.builder().put("key1", "a").put("key2", "b").build());
storage.record(
longMeasurement(0, 1, 1, Attributes.builder().put("key1", "a").put("key2", "b").build()));
assertThat(storage.collect(resource, scope, 0, testClock.nanoTime()))
.satisfies(
@ -157,23 +166,26 @@ class AsynchronousMetricStorageTest {
}
@Test
void record_MaxAccumulations() {
for (int i = 0; i <= MetricStorage.MAX_ACCUMULATIONS + 1; i++) {
longCounterStorage.recordLong(1, Attributes.builder().put("key" + i, "val").build());
void record_MaxCardinality() {
for (int i = 0; i <= MetricStorage.MAX_CARDINALITY + 1; i++) {
longCounterStorage.record(
longMeasurement(0, 1, 1, Attributes.builder().put("key" + i, "val").build()));
}
assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime()))
.satisfies(
metricData ->
assertThat(metricData.getLongSumData().getPoints())
.hasSize(MetricStorage.MAX_ACCUMULATIONS));
logs.assertContains("Instrument long-counter has exceeded the maximum allowed accumulations");
.hasSize(MetricStorage.MAX_CARDINALITY));
logs.assertContains("Instrument long-counter has exceeded the maximum allowed cardinality");
}
@Test
void record_DuplicateAttributes() {
longCounterStorage.recordLong(1, Attributes.builder().put("key1", "a").build());
longCounterStorage.recordLong(2, Attributes.builder().put("key1", "a").build());
longCounterStorage.record(
longMeasurement(0, 1, 1, Attributes.builder().put("key1", "a").build()));
longCounterStorage.record(
longMeasurement(0, 1, 2, Attributes.builder().put("key1", "a").build()));
assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime()))
.satisfies(
@ -191,8 +203,8 @@ class AsynchronousMetricStorageTest {
@Test
void collect_CumulativeReportsCumulativeObservations() {
// Record measurement and collect at time 10
longCounterStorage.recordLong(3, Attributes.empty());
assertThat(longCounterStorage.collect(resource, scope, 0, 10))
longCounterStorage.record(longMeasurement(0, 10, 3, Attributes.empty()));
assertThat(longCounterStorage.collect(resource, scope, 0, 0))
.hasLongSumSatisfying(
sum ->
sum.isCumulative()
@ -206,9 +218,10 @@ class AsynchronousMetricStorageTest {
registeredReader.setLastCollectEpochNanos(10);
// Record measurements and collect at time 30
longCounterStorage.recordLong(3, Attributes.empty());
longCounterStorage.recordLong(6, Attributes.builder().put("key", "value1").build());
assertThat(longCounterStorage.collect(resource, scope, 0, 30))
longCounterStorage.record(longMeasurement(0, 30, 3, Attributes.empty()));
longCounterStorage.record(
longMeasurement(0, 30, 6, Attributes.builder().put("key", "value1").build()));
assertThat(longCounterStorage.collect(resource, scope, 0, 0))
.hasLongSumSatisfying(
sum ->
sum.isCumulative()
@ -228,9 +241,10 @@ class AsynchronousMetricStorageTest {
registeredReader.setLastCollectEpochNanos(30);
// Record measurement and collect at time 35
longCounterStorage.recordLong(4, Attributes.empty());
longCounterStorage.recordLong(5, Attributes.builder().put("key", "value2").build());
assertThat(longCounterStorage.collect(resource, scope, 0, 35))
longCounterStorage.record(longMeasurement(0, 35, 4, Attributes.empty()));
longCounterStorage.record(
longMeasurement(0, 35, 5, Attributes.builder().put("key", "value2").build()));
assertThat(longCounterStorage.collect(resource, scope, 0, 0))
.hasLongSumSatisfying(
sum ->
sum.isCumulative()
@ -264,8 +278,8 @@ class AsynchronousMetricStorageTest {
InstrumentValueType.LONG));
// Record measurement and collect at time 10
longCounterStorage.recordLong(3, Attributes.empty());
assertThat(longCounterStorage.collect(resource, scope, 0, 10))
longCounterStorage.record(longMeasurement(0, 10, 3, Attributes.empty()));
assertThat(longCounterStorage.collect(resource, scope, 0, 0))
.hasLongSumSatisfying(
sum ->
sum.isDelta()
@ -279,9 +293,10 @@ class AsynchronousMetricStorageTest {
registeredReader.setLastCollectEpochNanos(10);
// Record measurement and collect at time 30
longCounterStorage.recordLong(3, Attributes.empty());
longCounterStorage.recordLong(6, Attributes.builder().put("key", "value1").build());
assertThat(longCounterStorage.collect(resource, scope, 0, 30))
longCounterStorage.record(longMeasurement(0, 30, 3, Attributes.empty()));
longCounterStorage.record(
longMeasurement(0, 30, 6, Attributes.builder().put("key", "value1").build()));
assertThat(longCounterStorage.collect(resource, scope, 0, 0))
.hasLongSumSatisfying(
sum ->
sum.isDelta()
@ -301,9 +316,10 @@ class AsynchronousMetricStorageTest {
registeredReader.setLastCollectEpochNanos(30);
// Record measurement and collect at time 35
longCounterStorage.recordLong(4, Attributes.empty());
longCounterStorage.recordLong(5, Attributes.builder().put("key", "value2").build());
assertThat(longCounterStorage.collect(resource, scope, 0, 35))
longCounterStorage.record(longMeasurement(0, 35, 4, Attributes.empty()));
longCounterStorage.record(
longMeasurement(0, 35, 5, Attributes.builder().put("key", "value2").build()));
assertThat(longCounterStorage.collect(resource, scope, 0, 0))
.hasLongSumSatisfying(
sum ->
sum.isDelta()

View File

@ -5,11 +5,11 @@
package io.opentelemetry.sdk.metrics.internal.state;
import static io.opentelemetry.sdk.metrics.internal.state.Measurement.doubleMeasurement;
import static io.opentelemetry.sdk.metrics.internal.state.Measurement.longMeasurement;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -135,12 +135,13 @@ class CallbackRegistrationTest {
CallbackRegistration callbackRegistration =
CallbackRegistration.create(Collections.singletonList(measurement1), callback);
callbackRegistration.invokeCallback(registeredReader);
callbackRegistration.invokeCallback(registeredReader, 0, 1);
assertThat(counter.get()).isEqualTo(1.1);
verify(storage1).recordDouble(1.1, Attributes.builder().put("key", "val").build());
verify(storage2, never()).recordDouble(anyDouble(), any());
verify(storage3, never()).recordDouble(anyDouble(), any());
verify(storage1)
.record(doubleMeasurement(0, 1, 1.1, Attributes.builder().put("key", "val").build()));
verify(storage2, never()).record(any());
verify(storage3, never()).record(any());
}
@Test
@ -153,12 +154,14 @@ class CallbackRegistrationTest {
CallbackRegistration callbackRegistration =
CallbackRegistration.create(Collections.singletonList(measurement2), callback);
callbackRegistration.invokeCallback(registeredReader);
callbackRegistration.invokeCallback(registeredReader, 0, 1);
assertThat(counter.get()).isEqualTo(1);
verify(storage1, never()).recordLong(anyLong(), any());
verify(storage2).recordLong(1, Attributes.builder().put("key", "val").build());
verify(storage3).recordLong(1, Attributes.builder().put("key", "val").build());
verify(storage1, never()).record(any());
verify(storage2)
.record(longMeasurement(0, 1, 1, Attributes.builder().put("key", "val").build()));
verify(storage3)
.record(longMeasurement(0, 1, 1, Attributes.builder().put("key", "val").build()));
}
@Test
@ -175,13 +178,16 @@ class CallbackRegistrationTest {
CallbackRegistration callbackRegistration =
CallbackRegistration.create(Arrays.asList(measurement1, measurement2), callback);
callbackRegistration.invokeCallback(registeredReader);
callbackRegistration.invokeCallback(registeredReader, 0, 1);
assertThat(doubleCounter.get()).isEqualTo(1.1);
assertThat(longCounter.get()).isEqualTo(1);
verify(storage1).recordDouble(1.1, Attributes.builder().put("key", "val").build());
verify(storage2).recordLong(1, Attributes.builder().put("key", "val").build());
verify(storage3).recordLong(1, Attributes.builder().put("key", "val").build());
verify(storage1)
.record(doubleMeasurement(0, 1, 1.1, Attributes.builder().put("key", "val").build()));
verify(storage2)
.record(longMeasurement(0, 1, 1, Attributes.builder().put("key", "val").build()));
verify(storage3)
.record(longMeasurement(0, 1, 1, Attributes.builder().put("key", "val").build()));
}
@Test
@ -197,7 +203,7 @@ class CallbackRegistrationTest {
CallbackRegistration callbackRegistration =
CallbackRegistration.create(Collections.singletonList(measurement), callback);
callbackRegistration.invokeCallback(registeredReader);
callbackRegistration.invokeCallback(registeredReader, 0, 1);
assertThat(counter.get()).isEqualTo(0);
}
@ -211,11 +217,11 @@ class CallbackRegistrationTest {
CallbackRegistration callbackRegistration =
CallbackRegistration.create(Arrays.asList(measurement1, measurement2), callback);
callbackRegistration.invokeCallback(registeredReader);
callbackRegistration.invokeCallback(registeredReader, 0, 1);
verify(storage1, never()).recordDouble(anyDouble(), any());
verify(storage2, never()).recordDouble(anyDouble(), any());
verify(storage3, never()).recordDouble(anyDouble(), any());
verify(storage1, never()).record(any());
verify(storage2, never()).record(any());
verify(storage3, never()).record(any());
logs.assertContains("An exception occurred invoking callback");
}
@ -228,11 +234,11 @@ class CallbackRegistrationTest {
CallbackRegistration callbackRegistration =
CallbackRegistration.create(Collections.singletonList(measurement2), callback);
callbackRegistration.invokeCallback(registeredReader);
callbackRegistration.invokeCallback(registeredReader, 0, 1);
verify(storage1, never()).recordDouble(anyDouble(), any());
verify(storage2, never()).recordDouble(anyDouble(), any());
verify(storage3, never()).recordDouble(anyDouble(), any());
verify(storage1, never()).record(any());
verify(storage2, never()).record(any());
verify(storage3, never()).record(any());
logs.assertContains("An exception occurred invoking callback");
}

View File

@ -18,6 +18,7 @@ import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.InstrumentValueType;
import io.opentelemetry.sdk.metrics.data.LongExemplarData;
import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
@ -54,7 +55,7 @@ public class SynchronousMetricStorageTest {
private final RegisteredReader cumulativeReader =
RegisteredReader.create(InMemoryMetricReader.create(), ViewRegistry.create());
private final TestClock testClock = TestClock.create();
private final Aggregator<Long, LongExemplarData> aggregator =
private final Aggregator<LongPointData, LongExemplarData> aggregator =
((AggregatorFactory) Aggregation.sum())
.createAggregator(DESCRIPTOR, ExemplarFilter.alwaysOff());
private final AttributesProcessor attributesProcessor = AttributesProcessor.noop();
@ -62,10 +63,10 @@ public class SynchronousMetricStorageTest {
@Test
void attributesProcessor_used() {
AttributesProcessor spyAttributesProcessor = Mockito.spy(this.attributesProcessor);
SynchronousMetricStorage accumulator =
SynchronousMetricStorage storage =
new DefaultSynchronousMetricStorage<>(
cumulativeReader, METRIC_DESCRIPTOR, aggregator, spyAttributesProcessor);
accumulator.bind(Attributes.empty());
storage.bind(Attributes.empty());
Mockito.verify(spyAttributesProcessor).process(Attributes.empty(), Context.current());
}
@ -75,12 +76,12 @@ public class SynchronousMetricStorageTest {
AttributesProcessor attributesProcessor =
AttributesProcessor.append(Attributes.builder().put("modifiedK", "modifiedV").build());
AttributesProcessor spyLabelsProcessor = Mockito.spy(attributesProcessor);
SynchronousMetricStorage accumulator =
SynchronousMetricStorage storage =
new DefaultSynchronousMetricStorage<>(
cumulativeReader, METRIC_DESCRIPTOR, aggregator, spyLabelsProcessor);
BoundStorageHandle handle = accumulator.bind(labels);
BoundStorageHandle handle = storage.bind(labels);
handle.recordDouble(1, labels, Context.root());
MetricData md = accumulator.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, testClock.now());
MetricData md = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, testClock.now());
assertThat(md)
.hasDoubleSumSatisfying(
sum ->
@ -197,7 +198,7 @@ public class SynchronousMetricStorageTest {
cumulativeReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor);
// Record measurements for max number of attributes
for (int i = 0; i < MetricStorage.MAX_ACCUMULATIONS; i++) {
for (int i = 0; i < MetricStorage.MAX_CARDINALITY; i++) {
storage.recordDouble(
3, Attributes.builder().put("key", "value" + i).build(), Context.current());
}
@ -207,7 +208,7 @@ public class SynchronousMetricStorageTest {
sum.satisfies(
sumData ->
assertThat(sumData.getPoints())
.hasSize(MetricStorage.MAX_ACCUMULATIONS)
.hasSize(MetricStorage.MAX_CARDINALITY)
.allSatisfy(
point -> {
assertThat(point.getStartEpochNanos()).isEqualTo(0);
@ -220,7 +221,7 @@ public class SynchronousMetricStorageTest {
// Record measurement for additional attribute, exceeding limit
storage.recordDouble(
3,
Attributes.builder().put("key", "value" + MetricStorage.MAX_ACCUMULATIONS + 1).build(),
Attributes.builder().put("key", "value" + MetricStorage.MAX_CARDINALITY + 1).build(),
Context.current());
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 20))
.hasDoubleSumSatisfying(
@ -228,7 +229,7 @@ public class SynchronousMetricStorageTest {
sum.satisfies(
sumData ->
assertThat(sumData.getPoints())
.hasSize(MetricStorage.MAX_ACCUMULATIONS)
.hasSize(MetricStorage.MAX_CARDINALITY)
.allSatisfy(
point -> {
assertThat(point.getStartEpochNanos()).isEqualTo(0);
@ -240,8 +241,8 @@ public class SynchronousMetricStorageTest {
point
.getAttributes()
.get(AttributeKey.stringKey("key"))
.equals("value" + MetricStorage.MAX_ACCUMULATIONS + 1))));
logs.assertContains("Instrument name has exceeded the maximum allowed accumulations");
.equals("value" + MetricStorage.MAX_CARDINALITY + 1))));
logs.assertContains("Instrument name has exceeded the maximum allowed cardinality");
}
@Test
@ -251,7 +252,7 @@ public class SynchronousMetricStorageTest {
deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor);
// Record measurements for max number of attributes
for (int i = 0; i < MetricStorage.MAX_ACCUMULATIONS; i++) {
for (int i = 0; i < MetricStorage.MAX_CARDINALITY; i++) {
storage.recordDouble(
3, Attributes.builder().put("key", "value" + i).build(), Context.current());
}
@ -261,7 +262,7 @@ public class SynchronousMetricStorageTest {
sum.satisfies(
sumData ->
assertThat(sumData.getPoints())
.hasSize(MetricStorage.MAX_ACCUMULATIONS)
.hasSize(MetricStorage.MAX_CARDINALITY)
.allSatisfy(
point -> {
assertThat(point.getStartEpochNanos()).isEqualTo(0);
@ -274,7 +275,7 @@ public class SynchronousMetricStorageTest {
// Record measurement for additional attribute, should not exceed limit due to reset
storage.recordDouble(
3,
Attributes.builder().put("key", "value" + MetricStorage.MAX_ACCUMULATIONS + 1).build(),
Attributes.builder().put("key", "value" + MetricStorage.MAX_CARDINALITY + 1).build(),
Context.current());
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 20))
.hasDoubleSumSatisfying(
@ -288,13 +289,13 @@ public class SynchronousMetricStorageTest {
.hasValue(3)
.hasAttributes(
Attributes.builder()
.put("key", "value" + MetricStorage.MAX_ACCUMULATIONS + 1)
.put("key", "value" + MetricStorage.MAX_CARDINALITY + 1)
.build())));
assertThat(logs.getEvents()).isEmpty();
deltaReader.setLastCollectEpochNanos(20);
// Record measurements exceeding max number of attributes. Last measurement should be dropped
for (int i = 0; i < MetricStorage.MAX_ACCUMULATIONS + 1; i++) {
for (int i = 0; i < MetricStorage.MAX_CARDINALITY + 1; i++) {
storage.recordDouble(
3, Attributes.builder().put("key", "value" + i).build(), Context.current());
}
@ -304,7 +305,7 @@ public class SynchronousMetricStorageTest {
sum.satisfies(
sumData ->
assertThat(sumData.getPoints())
.hasSize(MetricStorage.MAX_ACCUMULATIONS)
.hasSize(MetricStorage.MAX_CARDINALITY)
.allSatisfy(
point -> {
assertThat(point.getStartEpochNanos()).isEqualTo(20);
@ -316,7 +317,7 @@ public class SynchronousMetricStorageTest {
point
.getAttributes()
.get(AttributeKey.stringKey("key"))
.equals("value" + MetricStorage.MAX_ACCUMULATIONS + 1))));
logs.assertContains("Instrument name has exceeded the maximum allowed accumulations");
.equals("value" + MetricStorage.MAX_CARDINALITY + 1))));
logs.assertContains("Instrument name has exceeded the maximum allowed cardinality");
}
}