Metric cardinality limits (#3831)

* Drop stale streams, make metrics minimum collection interval configurable

* Limit number of accumulations in async and sync metric storage

* PR feedback
This commit is contained in:
jack-berg 2021-11-09 11:46:14 -06:00 committed by GitHub
parent 8f2b21b5ff
commit fc68a88be2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 541 additions and 25 deletions

View File

@ -45,11 +45,7 @@ public final class SdkMeterProvider implements MeterProvider, Closeable {
private final Map<CollectionHandle, CollectionInfo> collectionInfoMap;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final AtomicLong lastCollectionTimestamp;
// Minimum amount of time we allow between synchronous collections.
// This meant to reduce overhead when multiple exporters attempt to read metrics quickly.
// TODO: This should be configurable at the SDK level.
private static final long MINIMUM_COLLECTION_INTERVAL_NANOS = TimeUnit.MILLISECONDS.toNanos(100);
private final long minimumCollectionIntervalNanos;
/**
* Returns a new {@link SdkMeterProviderBuilder} for {@link SdkMeterProvider}.
@ -65,14 +61,16 @@ public final class SdkMeterProvider implements MeterProvider, Closeable {
Clock clock,
Resource resource,
ViewRegistry viewRegistry,
ExemplarFilter exemplarSampler) {
ExemplarFilter exemplarSampler,
long minimumCollectionIntervalNanos) {
this.sharedState =
MeterProviderSharedState.create(clock, resource, viewRegistry, exemplarSampler);
this.registry =
new ComponentRegistry<>(
instrumentationLibraryInfo -> new SdkMeter(sharedState, instrumentationLibraryInfo));
this.lastCollectionTimestamp =
new AtomicLong(clock.nanoTime() - MINIMUM_COLLECTION_INTERVAL_NANOS);
new AtomicLong(clock.nanoTime() - minimumCollectionIntervalNanos);
this.minimumCollectionIntervalNanos = minimumCollectionIntervalNanos;
// Here we construct our own unique handle ids for this SDK.
// These are guaranteed to be unique per-reader for this SDK, and only this SDK.
@ -148,7 +146,7 @@ public final class SdkMeterProvider implements MeterProvider, Closeable {
long pastNanoTime = lastCollectionTimestamp.get();
// It hasn't been long enough since the last collection.
boolean disableSynchronousCollection =
(currentNanoTime - pastNanoTime) < MINIMUM_COLLECTION_INTERVAL_NANOS;
(currentNanoTime - pastNanoTime) < minimumCollectionIntervalNanos;
// If we're not disabling metrics, write the current collection time.
// We don't care if this happens in more than one thread, suppression is optimistic, and the
// interval is small enough some jitter isn't important.

View File

@ -5,6 +5,8 @@
package io.opentelemetry.sdk.metrics;
import static io.opentelemetry.api.internal.Utils.checkArgument;
import io.opentelemetry.api.metrics.GlobalMeterProvider;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.metrics.exemplar.ExemplarFilter;
@ -15,9 +17,11 @@ import io.opentelemetry.sdk.metrics.internal.view.ViewRegistryBuilder;
import io.opentelemetry.sdk.metrics.view.InstrumentSelector;
import io.opentelemetry.sdk.metrics.view.View;
import io.opentelemetry.sdk.resources.Resource;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* Builder class for the {@link SdkMeterProvider}. Has fully functional default implementations of
@ -31,6 +35,7 @@ public final class SdkMeterProviderBuilder {
private final List<MetricReaderFactory> metricReaders = new ArrayList<>();
// Default the sampling strategy.
private ExemplarFilter exemplarFilter = ExemplarFilter.sampleWithTraces();
private long minimumCollectionIntervalNanos = TimeUnit.MILLISECONDS.toNanos(100);
SdkMeterProviderBuilder() {}
@ -123,6 +128,20 @@ public final class SdkMeterProviderBuilder {
return this;
}
/**
* Configure the minimum duration between synchronous collections. If collections occur more
* frequently than this, synchronous collection will be suppressed.
*
* @param duration The duration.
* @return this
*/
public SdkMeterProviderBuilder setMinimumCollectionInterval(Duration duration) {
Objects.requireNonNull(duration, "duration");
checkArgument(!duration.isNegative(), "duration must not be negative");
minimumCollectionIntervalNanos = duration.toNanos();
return this;
}
/**
* Returns a new {@link SdkMeterProvider} built with the configuration of this {@link
* SdkMeterProviderBuilder}. This provider is not registered as the global {@link
@ -135,6 +154,11 @@ public final class SdkMeterProviderBuilder {
*/
public SdkMeterProvider build() {
return new SdkMeterProvider(
metricReaders, clock, resource, viewRegistryBuilder.build(), exemplarFilter);
metricReaders,
clock,
resource,
viewRegistryBuilder.build(),
exemplarFilter,
minimumCollectionIntervalNanos);
}
}

View File

@ -10,6 +10,7 @@ import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.exemplar.ExemplarFilter;
@ -24,6 +25,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
@ -33,6 +36,8 @@ import javax.annotation.Nullable;
* at any time.
*/
public final class AsynchronousMetricStorage<T> implements MetricStorage {
private static final ThrottlingLogger logger =
new ThrottlingLogger(Logger.getLogger(DeltaMetricStorage.class.getName()));
private final MetricDescriptor metricDescriptor;
private final ReentrantLock collectLock = new ReentrantLock();
private final AsyncAccumulator<T> asyncAccumulator;
@ -56,7 +61,7 @@ public final class AsynchronousMetricStorage<T> implements MetricStorage {
Aggregator<T> aggregator =
view.getAggregation().createAggregator(instrument, ExemplarFilter.neverSample());
final AsyncAccumulator<T> measurementAccumulator = new AsyncAccumulator<>();
final AsyncAccumulator<T> measurementAccumulator = new AsyncAccumulator<>(instrument);
if (Aggregator.empty() == aggregator) {
return empty();
}
@ -90,7 +95,7 @@ public final class AsynchronousMetricStorage<T> implements MetricStorage {
final MetricDescriptor metricDescriptor = MetricDescriptor.create(view, instrument);
Aggregator<T> aggregator =
view.getAggregation().createAggregator(instrument, ExemplarFilter.neverSample());
final AsyncAccumulator<T> measurementAccumulator = new AsyncAccumulator<>();
final AsyncAccumulator<T> measurementAccumulator = new AsyncAccumulator<>(instrument);
final AttributesProcessor attributesProcessor = view.getAttributesProcessor();
// TODO: Find a way to grab the measurement JUST ONCE for all async metrics.
final ObservableLongMeasurement result =
@ -159,9 +164,24 @@ public final class AsynchronousMetricStorage<T> implements MetricStorage {
/** Helper class to record async measurements on demand. */
private static final class AsyncAccumulator<T> {
private final InstrumentDescriptor instrument;
private Map<Attributes, T> currentAccumulation = new HashMap<>();
AsyncAccumulator(InstrumentDescriptor instrument) {
this.instrument = instrument;
}
public void record(Attributes attributes, T accumulation) {
if (currentAccumulation.size() >= MetricStorageUtils.MAX_ACCUMULATIONS) {
logger.log(
Level.WARNING,
"Instrument "
+ instrument.getName()
+ " has exceeded the maximum allowed accumulations ("
+ MetricStorageUtils.MAX_ACCUMULATIONS
+ ").");
return;
}
// TODO: error on metric overwrites
currentAccumulation.put(attributes, accumulation);
}

View File

@ -37,7 +37,8 @@ public final class DefaultSynchronousMetricStorage<T> implements SynchronousMetr
AttributesProcessor attributesProcessor) {
this.attributesProcessor = attributesProcessor;
this.metricDescriptor = metricDescriptor;
this.deltaMetricStorage = new DeltaMetricStorage<>(aggregator);
this.deltaMetricStorage =
new DeltaMetricStorage<>(aggregator, metricDescriptor.getSourceInstrument());
this.temporalMetricStorage = new TemporalMetricStorage<>(aggregator, /* isSynchronous= */ true);
}

View File

@ -5,9 +5,14 @@
package io.opentelemetry.sdk.metrics.internal.state;
import static io.opentelemetry.sdk.metrics.internal.state.MetricStorageUtils.MAX_ACCUMULATIONS;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorHandle;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.internal.export.CollectionHandle;
import java.util.ArrayList;
import java.util.HashMap;
@ -15,6 +20,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.concurrent.ThreadSafe;
/**
@ -25,13 +32,20 @@ import javax.annotation.concurrent.ThreadSafe;
*/
@ThreadSafe
class DeltaMetricStorage<T> {
private static final ThrottlingLogger logger =
new ThrottlingLogger(Logger.getLogger(DeltaMetricStorage.class.getName()));
private static final BoundStorageHandle NOOP_STORAGE_HANDLE = new NoopBoundHandle();
private final Aggregator<T> aggregator;
private final InstrumentDescriptor instrument;
private final ConcurrentHashMap<Attributes, AggregatorHandle<T>> activeCollectionStorage =
new ConcurrentHashMap<>();
private final List<DeltaAccumulation<T>> unreportedDeltas = new ArrayList<>();
DeltaMetricStorage(Aggregator<T> aggregator) {
DeltaMetricStorage(Aggregator<T> aggregator, InstrumentDescriptor instrument) {
this.aggregator = aggregator;
this.instrument = instrument;
}
/**
@ -47,9 +61,19 @@ class DeltaMetricStorage<T> {
return aggregatorHandle;
}
// Missing entry or no longer mapped, try to add a new entry.
// Missing entry or no longer mapped. Try to add a new one if not exceeded cardinality limits.
aggregatorHandle = aggregator.createHandle();
while (true) {
if (activeCollectionStorage.size() >= MAX_ACCUMULATIONS) {
logger.log(
Level.WARNING,
"Instrument "
+ instrument.getName()
+ " has exceeded the maximum allowed accumulations ("
+ MAX_ACCUMULATIONS
+ ").");
return NOOP_STORAGE_HANDLE;
}
AggregatorHandle<?> boundAggregatorHandle =
activeCollectionStorage.putIfAbsent(attributes, aggregatorHandle);
if (boundAggregatorHandle != null) {
@ -121,4 +145,17 @@ class DeltaMetricStorage<T> {
unreportedDeltas.add(new DeltaAccumulation<>(result));
}
}
/** An implementation of {@link BoundStorageHandle} that does not record. */
private static class NoopBoundHandle implements BoundStorageHandle {
@Override
public void recordLong(long value, Attributes attributes, Context context) {}
@Override
public void recordDouble(double value, Attributes attributes, Context context) {}
@Override
public void release() {}
}
}

View File

@ -11,15 +11,20 @@ import java.util.Map;
/** Utilities to help deal w/ {@code Map<Attributes, Accumulation>} in metric storage. */
final class MetricStorageUtils {
/** The max number of metric accumulations for a particular {@link MetricStorage}. */
static final int MAX_ACCUMULATIONS = 2000;
private MetricStorageUtils() {}
/**
* Merges accumulations from {@code toMerge} into {@code result}.
* Merges accumulations from {@code toMerge} into {@code result}. Keys from {@code result} which
* don't appear in {@code toMerge} are removed.
*
* <p>Note: This mutates the result map.
*/
static <T> void mergeInPlace(
Map<Attributes, T> result, Map<Attributes, T> toMerge, Aggregator<T> aggregator) {
result.entrySet().removeIf(entry -> !toMerge.containsKey(entry.getKey()));
toMerge.forEach(
(k, v) -> {
result.compute(k, (k2, v2) -> (v2 != null) ? aggregator.merge(v2, v) : v);
@ -27,7 +32,8 @@ final class MetricStorageUtils {
}
/**
* Diffs accumulations from {@code toMerge} into {@code result}.
* Diffs accumulations from {@code toMerge} into {@code result}. Keys from {@code result} which
* don't appear in {@code toMerge} are removed.
*
* <p>If no prior value is found, then the value from {@code toDiff} is used.
*
@ -35,6 +41,7 @@ final class MetricStorageUtils {
*/
static <T> void diffInPlace(
Map<Attributes, T> result, Map<Attributes, T> toDiff, Aggregator<T> aggregator) {
result.entrySet().removeIf(entry -> !toDiff.containsKey(entry.getKey()));
toDiff.forEach(
(k, v) -> {
result.compute(k, (k2, v2) -> (v2 != null) ? aggregator.diff(v2, v) : v);

View File

@ -107,17 +107,16 @@ class TemporalMetricStorage<T> {
/** Remembers what was presented to a specific exporter. */
private static class LastReportedAccumulation<T> {
@Nullable private final Map<Attributes, T> accumulation;
private final Map<Attributes, T> accumulation;
private final long epochNanos;
/**
* Constructs a new reporting record.
*
* @param accumulation The last accumulation of metric data or {@code null} if the accumulator
* is not stateful.
* @param accumulation The last accumulation of metric data.
* @param epochNanos The timestamp the data was reported.
*/
LastReportedAccumulation(@Nullable Map<Attributes, T> accumulation, long epochNanos) {
LastReportedAccumulation(Map<Attributes, T> accumulation, long epochNanos) {
this.accumulation = accumulation;
this.epochNanos = epochNanos;
}
@ -126,7 +125,6 @@ class TemporalMetricStorage<T> {
return epochNanos;
}
@Nullable
Map<Attributes, T> getAccumulation() {
return accumulation;
}

View File

@ -0,0 +1,228 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics;
import static io.opentelemetry.sdk.testing.assertj.metrics.MetricAssertions.assertThat;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.sdk.metrics.testing.InMemoryMetricReader;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class CardinalityTest {
/** Traces {@code MetricStorageUtils#MAX_ACCUMULATIONS}. */
private static final int MAX_ACCUMULATIONS = 2000;
private InMemoryMetricReader deltaReader;
private InMemoryMetricReader cumulativeReader;
private Meter meter;
@BeforeEach
void setup() {
deltaReader = InMemoryMetricReader.createDelta();
cumulativeReader = InMemoryMetricReader.create();
SdkMeterProvider sdkMeterProvider =
SdkMeterProvider.builder()
.registerMetricReader(deltaReader)
.registerMetricReader(cumulativeReader)
.setMinimumCollectionInterval(Duration.ofSeconds(0))
.build();
meter = sdkMeterProvider.get(CardinalityTest.class.getName());
}
/**
* Records to sync instruments, with distinct attributes each time. Validates that stale metrics
* are dropped for delta and cumulative readers. Stale metrics are those with attributes that did
* not receive recordings in the most recent collection.
*/
@Test
void staleMetricsDropped_synchronousInstrument() {
LongCounter syncCounter = meter.counterBuilder("sync-counter").build();
for (int i = 1; i <= 5; i++) {
syncCounter.add(1, Attributes.builder().put("key", "num_" + i).build());
assertThat(deltaReader.collectAllMetrics())
.as("Delta collection " + i)
.hasSize(1)
.satisfiesExactly(
metricData ->
assertThat(metricData)
.hasName("sync-counter")
.hasLongSum()
.isDelta()
.points()
.hasSize(1));
assertThat(cumulativeReader.collectAllMetrics())
.as("Cumulative collection " + i)
.hasSize(1)
.satisfiesExactly(
metricData ->
assertThat(metricData)
.hasName("sync-counter")
.hasLongSum()
.isCumulative()
.points()
.hasSize(1));
}
}
/**
* Records to async instruments, with distinct attributes each time. Validates that stale metrics
* are dropped for delta and cumulative readers. Stale metrics are those with attributes that did
* not receive recordings in the most recent collection.
*/
@Test
void staleMetricsDropped_asynchronousInstrument() {
AtomicLong count = new AtomicLong();
meter
.counterBuilder("async-counter")
.buildWithCallback(
measurement ->
measurement.observe(
1, Attributes.builder().put("key", "num_" + count.incrementAndGet()).build()));
for (int i = 1; i <= 5; i++) {
assertThat(deltaReader.collectAllMetrics())
.as("Delta collection " + i)
.hasSize(1)
.satisfiesExactlyInAnyOrder(
metricData ->
assertThat(metricData)
.hasName("async-counter")
.hasLongSum()
.isDelta()
.points()
.hasSize(1));
assertThat(cumulativeReader.collectAllMetrics())
.as("Cumulative collection " + i)
.hasSize(1)
.satisfiesExactlyInAnyOrder(
metricData ->
assertThat(metricData)
.hasName("async-counter")
.hasLongSum()
.isCumulative()
.points()
.hasSize(1));
}
}
/**
* Records to sync instruments, many distinct attributes. Validates that the {@code
* MetricStorageUtils#MAX_ACCUMULATIONS} is enforced for each instrument.
*/
@Test
void cardinalityLimits_synchronousInstrument() {
LongCounter syncCounter1 = meter.counterBuilder("sync-counter1").build();
LongCounter syncCounter2 = meter.counterBuilder("sync-counter2").build();
for (int i = 0; i < MAX_ACCUMULATIONS + 1; i++) {
syncCounter1.add(1, Attributes.builder().put("key", "value" + i).build());
syncCounter2.add(1, Attributes.builder().put("key", "value" + i).build());
}
assertThat(deltaReader.collectAllMetrics())
.as("Delta collection")
.hasSize(2)
.satisfiesExactlyInAnyOrder(
metricData ->
assertThat(metricData)
.hasName("sync-counter1")
.hasLongSum()
.isDelta()
.points()
.hasSize(MAX_ACCUMULATIONS),
metricData ->
assertThat(metricData)
.hasName("sync-counter2")
.hasLongSum()
.isDelta()
.points()
.hasSize(MAX_ACCUMULATIONS));
assertThat(cumulativeReader.collectAllMetrics())
.as("Cumulative collection")
.hasSize(2)
.satisfiesExactlyInAnyOrder(
metricData ->
assertThat(metricData)
.hasName("sync-counter1")
.hasLongSum()
.isCumulative()
.points()
.hasSize(MAX_ACCUMULATIONS),
metricData ->
assertThat(metricData)
.hasName("sync-counter2")
.hasLongSum()
.isCumulative()
.points()
.hasSize(MAX_ACCUMULATIONS));
}
/**
* Records to sync instruments, many distinct attributes. Validates that the {@code
* MetricStorageUtils#MAX_ACCUMULATIONS} is enforced for each instrument.
*/
@Test
void cardinalityLimits_asynchronousInstrument() {
Consumer<ObservableLongMeasurement> callback =
measurement -> {
for (int i = 0; i < MAX_ACCUMULATIONS + 1; i++) {
measurement.observe(1, Attributes.builder().put("key", "value" + i).build());
}
};
meter.counterBuilder("async-counter1").buildWithCallback(callback);
meter.counterBuilder("async-counter2").buildWithCallback(callback);
assertThat(deltaReader.collectAllMetrics())
.as("Delta collection")
.hasSize(2)
.satisfiesExactlyInAnyOrder(
metricData ->
assertThat(metricData)
.hasName("async-counter1")
.hasLongSum()
.isDelta()
.points()
.hasSize(MAX_ACCUMULATIONS),
metricData ->
assertThat(metricData)
.hasName("async-counter2")
.hasLongSum()
.isDelta()
.points()
.hasSize(MAX_ACCUMULATIONS));
assertThat(cumulativeReader.collectAllMetrics())
.as("Cumulative collection")
.hasSize(2)
.satisfiesExactlyInAnyOrder(
metricData ->
assertThat(metricData)
.hasName("async-counter1")
.hasLongSum()
.isCumulative()
.points()
.hasSize(MAX_ACCUMULATIONS),
metricData ->
assertThat(metricData)
.hasName("async-counter2")
.hasLongSum()
.isCumulative()
.points()
.hasSize(MAX_ACCUMULATIONS));
}
}

View File

@ -41,7 +41,8 @@ class DeltaMetricStorageTest {
allCollectors.add(collector2);
storage =
new DeltaMetricStorage<>(
Aggregation.sum().createAggregator(DESCRIPTOR, ExemplarFilter.neverSample()));
Aggregation.sum().createAggregator(DESCRIPTOR, ExemplarFilter.neverSample()),
DESCRIPTOR);
}
@Test

View File

@ -12,6 +12,7 @@ import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoublePointData;
import io.opentelemetry.sdk.metrics.exemplar.ExemplarFilter;
import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.internal.aggregator.DoubleAccumulation;
@ -137,6 +138,104 @@ class TemporalMetricStorageTest {
point -> assertThat(point).hasStartEpochNanos(0).hasEpochNanos(35).hasValue(8));
}
@Test
void synchronousCumulative_dropsStale() {
TemporalMetricStorage<DoubleAccumulation> storage =
new TemporalMetricStorage<>(SUM, /* isSynchronous= */ true);
// Send in new measurement at time 10 for collector 1, with attr1
Map<Attributes, DoubleAccumulation> measurement1 = new HashMap<>();
Attributes attr1 = Attributes.builder().put("key", "value1").build();
measurement1.put(attr1, DoubleAccumulation.create(3));
assertThat(
storage.buildMetricFor(
collector1,
Resource.empty(),
InstrumentationLibraryInfo.empty(),
METRIC_DESCRIPTOR,
AggregationTemporality.CUMULATIVE,
measurement1,
0,
10))
.hasDoubleSum()
.isCumulative()
.points()
.hasSize(1)
.isNotEmpty()
.contains(DoublePointData.create(0, 10, attr1, 3));
// Send in new measurement at time 20 for collector 1, with attr2
// Result should drop accumulation for attr1, only reporting accumulation for attr2
Map<Attributes, DoubleAccumulation> measurement2 = new HashMap<>();
Attributes attr2 = Attributes.builder().put("key", "value2").build();
measurement2.put(attr2, DoubleAccumulation.create(7));
assertThat(
storage.buildMetricFor(
collector1,
Resource.empty(),
InstrumentationLibraryInfo.empty(),
METRIC_DESCRIPTOR,
AggregationTemporality.CUMULATIVE,
measurement2,
0,
20))
.hasDoubleSum()
.isCumulative()
.points()
.hasSize(1)
.isNotEmpty()
.containsExactly(DoublePointData.create(0, 20, attr2, 7));
}
@Test
void synchronousDelta_dropsStale() {
TemporalMetricStorage<DoubleAccumulation> storage =
new TemporalMetricStorage<>(SUM, /* isSynchronous= */ true);
// Send in new measurement at time 10 for collector 1, with attr1
Map<Attributes, DoubleAccumulation> measurement1 = new HashMap<>();
Attributes attr1 = Attributes.builder().put("key", "value1").build();
measurement1.put(attr1, DoubleAccumulation.create(3));
assertThat(
storage.buildMetricFor(
collector1,
Resource.empty(),
InstrumentationLibraryInfo.empty(),
METRIC_DESCRIPTOR,
AggregationTemporality.DELTA,
measurement1,
0,
10))
.hasDoubleSum()
.isDelta()
.points()
.hasSize(1)
.isNotEmpty()
.contains(DoublePointData.create(0, 10, attr1, 3));
// Send in new measurement at time 20 for collector 1, with attr2
// Result should drop accumulation for attr1, only reporting accumulation for attr2
Map<Attributes, DoubleAccumulation> measurement2 = new HashMap<>();
Attributes attr2 = Attributes.builder().put("key", "value2").build();
measurement2.put(attr2, DoubleAccumulation.create(7));
assertThat(
storage.buildMetricFor(
collector1,
Resource.empty(),
InstrumentationLibraryInfo.empty(),
METRIC_DESCRIPTOR,
AggregationTemporality.DELTA,
measurement2,
0,
20))
.hasDoubleSum()
.isDelta()
.points()
.hasSize(1)
.isNotEmpty()
.containsExactly(DoublePointData.create(10, 20, attr2, 7));
}
@Test
void synchronousDelta_useLastTimestamp() {
AggregationTemporality temporality = AggregationTemporality.DELTA;
@ -378,6 +477,104 @@ class TemporalMetricStorageTest {
point -> assertThat(point).hasStartEpochNanos(0).hasEpochNanos(35).hasValue(2));
}
@Test
void asynchronousCumulative_dropsStale() {
TemporalMetricStorage<DoubleAccumulation> storage =
new TemporalMetricStorage<>(ASYNC_SUM, /* isSynchronous= */ false);
// Send in new measurement at time 10 for collector 1, with attr1
Map<Attributes, DoubleAccumulation> measurement1 = new HashMap<>();
Attributes attr1 = Attributes.builder().put("key", "value1").build();
measurement1.put(attr1, DoubleAccumulation.create(3));
assertThat(
storage.buildMetricFor(
collector1,
Resource.empty(),
InstrumentationLibraryInfo.empty(),
METRIC_DESCRIPTOR,
AggregationTemporality.CUMULATIVE,
measurement1,
0,
10))
.hasDoubleSum()
.isCumulative()
.points()
.hasSize(1)
.isNotEmpty()
.contains(DoublePointData.create(0, 10, attr1, 3));
// Send in new measurement at time 20 for collector 1, with attr2
// Result should drop accumulation for attr1, only reporting accumulation for attr2
Map<Attributes, DoubleAccumulation> measurement2 = new HashMap<>();
Attributes attr2 = Attributes.builder().put("key", "value2").build();
measurement2.put(attr2, DoubleAccumulation.create(7));
assertThat(
storage.buildMetricFor(
collector1,
Resource.empty(),
InstrumentationLibraryInfo.empty(),
METRIC_DESCRIPTOR,
AggregationTemporality.CUMULATIVE,
measurement2,
0,
20))
.hasDoubleSum()
.isCumulative()
.points()
.hasSize(1)
.isNotEmpty()
.containsExactly(DoublePointData.create(0, 20, attr2, 7));
}
@Test
void asynchronousDelta_dropsStale() {
TemporalMetricStorage<DoubleAccumulation> storage =
new TemporalMetricStorage<>(ASYNC_SUM, /* isSynchronous= */ false);
// Send in new measurement at time 10 for collector 1, with attr1
Map<Attributes, DoubleAccumulation> measurement1 = new HashMap<>();
Attributes attr1 = Attributes.builder().put("key", "value1").build();
measurement1.put(attr1, DoubleAccumulation.create(3));
assertThat(
storage.buildMetricFor(
collector1,
Resource.empty(),
InstrumentationLibraryInfo.empty(),
METRIC_DESCRIPTOR,
AggregationTemporality.DELTA,
measurement1,
0,
10))
.hasDoubleSum()
.isDelta()
.points()
.hasSize(1)
.isNotEmpty()
.contains(DoublePointData.create(0, 10, attr1, 3));
// Send in new measurement at time 20 for collector 1, with attr2
// Result should drop accumulation for attr1, only reporting accumulation for attr2
Map<Attributes, DoubleAccumulation> measurement2 = new HashMap<>();
Attributes attr2 = Attributes.builder().put("key", "value2").build();
measurement2.put(attr2, DoubleAccumulation.create(7));
assertThat(
storage.buildMetricFor(
collector1,
Resource.empty(),
InstrumentationLibraryInfo.empty(),
METRIC_DESCRIPTOR,
AggregationTemporality.DELTA,
measurement2,
0,
20))
.hasDoubleSum()
.isDelta()
.points()
.hasSize(1)
.isNotEmpty()
.containsExactly(DoublePointData.create(10, 20, attr2, 7));
}
@Test
void asynchronousDelta_diffsLastTimestamp() {
AggregationTemporality temporality = AggregationTemporality.DELTA;

View File

@ -8,6 +8,7 @@ package io.opentelemetry.sdk.metrics.testing;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import java.time.Duration;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -19,7 +20,11 @@ class InMemoryMetricReaderCumulativeTest {
@BeforeEach
void setup() {
reader = InMemoryMetricReader.create();
provider = SdkMeterProvider.builder().registerMetricReader(reader).build();
provider =
SdkMeterProvider.builder()
.setMinimumCollectionInterval(Duration.ofSeconds(0))
.registerMetricReader(reader)
.build();
}
private void generateFakeMetric(int index) {
@ -45,7 +50,7 @@ class InMemoryMetricReaderCumulativeTest {
// Add more data, should join.
generateFakeMetric(1);
assertThat(reader.collectAllMetrics()).hasSize(3);
assertThat(reader.collectAllMetrics()).hasSize(1);
}
@Test
@ -55,7 +60,7 @@ class InMemoryMetricReaderCumulativeTest {
generateFakeMetric(3);
// TODO: Better assertions for CompletableResultCode.
assertThat(reader.flush()).isNotNull();
assertThat(reader.collectAllMetrics()).hasSize(3);
assertThat(reader.collectAllMetrics()).hasSize(0);
}
@Test