diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProvider.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProvider.java index fd96bf3b30..b4b2e2f87e 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProvider.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProvider.java @@ -45,11 +45,7 @@ public final class SdkMeterProvider implements MeterProvider, Closeable { private final Map collectionInfoMap; private final AtomicBoolean isClosed = new AtomicBoolean(false); private final AtomicLong lastCollectionTimestamp; - - // Minimum amount of time we allow between synchronous collections. - // This meant to reduce overhead when multiple exporters attempt to read metrics quickly. - // TODO: This should be configurable at the SDK level. - private static final long MINIMUM_COLLECTION_INTERVAL_NANOS = TimeUnit.MILLISECONDS.toNanos(100); + private final long minimumCollectionIntervalNanos; /** * Returns a new {@link SdkMeterProviderBuilder} for {@link SdkMeterProvider}. @@ -65,14 +61,16 @@ public final class SdkMeterProvider implements MeterProvider, Closeable { Clock clock, Resource resource, ViewRegistry viewRegistry, - ExemplarFilter exemplarSampler) { + ExemplarFilter exemplarSampler, + long minimumCollectionIntervalNanos) { this.sharedState = MeterProviderSharedState.create(clock, resource, viewRegistry, exemplarSampler); this.registry = new ComponentRegistry<>( instrumentationLibraryInfo -> new SdkMeter(sharedState, instrumentationLibraryInfo)); this.lastCollectionTimestamp = - new AtomicLong(clock.nanoTime() - MINIMUM_COLLECTION_INTERVAL_NANOS); + new AtomicLong(clock.nanoTime() - minimumCollectionIntervalNanos); + this.minimumCollectionIntervalNanos = minimumCollectionIntervalNanos; // Here we construct our own unique handle ids for this SDK. // These are guaranteed to be unique per-reader for this SDK, and only this SDK. @@ -148,7 +146,7 @@ public final class SdkMeterProvider implements MeterProvider, Closeable { long pastNanoTime = lastCollectionTimestamp.get(); // It hasn't been long enough since the last collection. boolean disableSynchronousCollection = - (currentNanoTime - pastNanoTime) < MINIMUM_COLLECTION_INTERVAL_NANOS; + (currentNanoTime - pastNanoTime) < minimumCollectionIntervalNanos; // If we're not disabling metrics, write the current collection time. // We don't care if this happens in more than one thread, suppression is optimistic, and the // interval is small enough some jitter isn't important. diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProviderBuilder.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProviderBuilder.java index fb0cdd4b34..1e889b134d 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProviderBuilder.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProviderBuilder.java @@ -5,6 +5,8 @@ package io.opentelemetry.sdk.metrics; +import static io.opentelemetry.api.internal.Utils.checkArgument; + import io.opentelemetry.api.metrics.GlobalMeterProvider; import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.metrics.exemplar.ExemplarFilter; @@ -15,9 +17,11 @@ import io.opentelemetry.sdk.metrics.internal.view.ViewRegistryBuilder; import io.opentelemetry.sdk.metrics.view.InstrumentSelector; import io.opentelemetry.sdk.metrics.view.View; import io.opentelemetry.sdk.resources.Resource; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.concurrent.TimeUnit; /** * Builder class for the {@link SdkMeterProvider}. Has fully functional default implementations of @@ -31,6 +35,7 @@ public final class SdkMeterProviderBuilder { private final List metricReaders = new ArrayList<>(); // Default the sampling strategy. private ExemplarFilter exemplarFilter = ExemplarFilter.sampleWithTraces(); + private long minimumCollectionIntervalNanos = TimeUnit.MILLISECONDS.toNanos(100); SdkMeterProviderBuilder() {} @@ -123,6 +128,20 @@ public final class SdkMeterProviderBuilder { return this; } + /** + * Configure the minimum duration between synchronous collections. If collections occur more + * frequently than this, synchronous collection will be suppressed. + * + * @param duration The duration. + * @return this + */ + public SdkMeterProviderBuilder setMinimumCollectionInterval(Duration duration) { + Objects.requireNonNull(duration, "duration"); + checkArgument(!duration.isNegative(), "duration must not be negative"); + minimumCollectionIntervalNanos = duration.toNanos(); + return this; + } + /** * Returns a new {@link SdkMeterProvider} built with the configuration of this {@link * SdkMeterProviderBuilder}. This provider is not registered as the global {@link @@ -135,6 +154,11 @@ public final class SdkMeterProviderBuilder { */ public SdkMeterProvider build() { return new SdkMeterProvider( - metricReaders, clock, resource, viewRegistryBuilder.build(), exemplarFilter); + metricReaders, + clock, + resource, + viewRegistryBuilder.build(), + exemplarFilter, + minimumCollectionIntervalNanos); } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index 95408ca455..3c25cd4341 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -10,6 +10,7 @@ import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; import io.opentelemetry.api.metrics.ObservableLongMeasurement; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; +import io.opentelemetry.sdk.internal.ThrottlingLogger; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.exemplar.ExemplarFilter; @@ -24,6 +25,8 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.annotation.Nullable; /** @@ -33,6 +36,8 @@ import javax.annotation.Nullable; * at any time. */ public final class AsynchronousMetricStorage implements MetricStorage { + private static final ThrottlingLogger logger = + new ThrottlingLogger(Logger.getLogger(DeltaMetricStorage.class.getName())); private final MetricDescriptor metricDescriptor; private final ReentrantLock collectLock = new ReentrantLock(); private final AsyncAccumulator asyncAccumulator; @@ -56,7 +61,7 @@ public final class AsynchronousMetricStorage implements MetricStorage { Aggregator aggregator = view.getAggregation().createAggregator(instrument, ExemplarFilter.neverSample()); - final AsyncAccumulator measurementAccumulator = new AsyncAccumulator<>(); + final AsyncAccumulator measurementAccumulator = new AsyncAccumulator<>(instrument); if (Aggregator.empty() == aggregator) { return empty(); } @@ -90,7 +95,7 @@ public final class AsynchronousMetricStorage implements MetricStorage { final MetricDescriptor metricDescriptor = MetricDescriptor.create(view, instrument); Aggregator aggregator = view.getAggregation().createAggregator(instrument, ExemplarFilter.neverSample()); - final AsyncAccumulator measurementAccumulator = new AsyncAccumulator<>(); + final AsyncAccumulator measurementAccumulator = new AsyncAccumulator<>(instrument); final AttributesProcessor attributesProcessor = view.getAttributesProcessor(); // TODO: Find a way to grab the measurement JUST ONCE for all async metrics. final ObservableLongMeasurement result = @@ -159,9 +164,24 @@ public final class AsynchronousMetricStorage implements MetricStorage { /** Helper class to record async measurements on demand. */ private static final class AsyncAccumulator { + private final InstrumentDescriptor instrument; private Map currentAccumulation = new HashMap<>(); + AsyncAccumulator(InstrumentDescriptor instrument) { + this.instrument = instrument; + } + public void record(Attributes attributes, T accumulation) { + if (currentAccumulation.size() >= MetricStorageUtils.MAX_ACCUMULATIONS) { + logger.log( + Level.WARNING, + "Instrument " + + instrument.getName() + + " has exceeded the maximum allowed accumulations (" + + MetricStorageUtils.MAX_ACCUMULATIONS + + ")."); + return; + } // TODO: error on metric overwrites currentAccumulation.put(attributes, accumulation); } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index 557ffc07e2..8186d79fa9 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -37,7 +37,8 @@ public final class DefaultSynchronousMetricStorage implements SynchronousMetr AttributesProcessor attributesProcessor) { this.attributesProcessor = attributesProcessor; this.metricDescriptor = metricDescriptor; - this.deltaMetricStorage = new DeltaMetricStorage<>(aggregator); + this.deltaMetricStorage = + new DeltaMetricStorage<>(aggregator, metricDescriptor.getSourceInstrument()); this.temporalMetricStorage = new TemporalMetricStorage<>(aggregator, /* isSynchronous= */ true); } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaMetricStorage.java index aabb792282..4f56d8af15 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaMetricStorage.java @@ -5,9 +5,14 @@ package io.opentelemetry.sdk.metrics.internal.state; +import static io.opentelemetry.sdk.metrics.internal.state.MetricStorageUtils.MAX_ACCUMULATIONS; + import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.internal.ThrottlingLogger; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorHandle; +import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.internal.export.CollectionHandle; import java.util.ArrayList; import java.util.HashMap; @@ -15,6 +20,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.annotation.concurrent.ThreadSafe; /** @@ -25,13 +32,20 @@ import javax.annotation.concurrent.ThreadSafe; */ @ThreadSafe class DeltaMetricStorage { + + private static final ThrottlingLogger logger = + new ThrottlingLogger(Logger.getLogger(DeltaMetricStorage.class.getName())); + private static final BoundStorageHandle NOOP_STORAGE_HANDLE = new NoopBoundHandle(); + private final Aggregator aggregator; + private final InstrumentDescriptor instrument; private final ConcurrentHashMap> activeCollectionStorage = new ConcurrentHashMap<>(); private final List> unreportedDeltas = new ArrayList<>(); - DeltaMetricStorage(Aggregator aggregator) { + DeltaMetricStorage(Aggregator aggregator, InstrumentDescriptor instrument) { this.aggregator = aggregator; + this.instrument = instrument; } /** @@ -47,9 +61,19 @@ class DeltaMetricStorage { return aggregatorHandle; } - // Missing entry or no longer mapped, try to add a new entry. + // Missing entry or no longer mapped. Try to add a new one if not exceeded cardinality limits. aggregatorHandle = aggregator.createHandle(); while (true) { + if (activeCollectionStorage.size() >= MAX_ACCUMULATIONS) { + logger.log( + Level.WARNING, + "Instrument " + + instrument.getName() + + " has exceeded the maximum allowed accumulations (" + + MAX_ACCUMULATIONS + + ")."); + return NOOP_STORAGE_HANDLE; + } AggregatorHandle boundAggregatorHandle = activeCollectionStorage.putIfAbsent(attributes, aggregatorHandle); if (boundAggregatorHandle != null) { @@ -121,4 +145,17 @@ class DeltaMetricStorage { unreportedDeltas.add(new DeltaAccumulation<>(result)); } } + + /** An implementation of {@link BoundStorageHandle} that does not record. */ + private static class NoopBoundHandle implements BoundStorageHandle { + + @Override + public void recordLong(long value, Attributes attributes, Context context) {} + + @Override + public void recordDouble(double value, Attributes attributes, Context context) {} + + @Override + public void release() {} + } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageUtils.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageUtils.java index e67af3e70d..a5cbb22f7a 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageUtils.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageUtils.java @@ -11,15 +11,20 @@ import java.util.Map; /** Utilities to help deal w/ {@code Map} in metric storage. */ final class MetricStorageUtils { + /** The max number of metric accumulations for a particular {@link MetricStorage}. */ + static final int MAX_ACCUMULATIONS = 2000; + private MetricStorageUtils() {} /** - * Merges accumulations from {@code toMerge} into {@code result}. + * Merges accumulations from {@code toMerge} into {@code result}. Keys from {@code result} which + * don't appear in {@code toMerge} are removed. * *

Note: This mutates the result map. */ static void mergeInPlace( Map result, Map toMerge, Aggregator aggregator) { + result.entrySet().removeIf(entry -> !toMerge.containsKey(entry.getKey())); toMerge.forEach( (k, v) -> { result.compute(k, (k2, v2) -> (v2 != null) ? aggregator.merge(v2, v) : v); @@ -27,7 +32,8 @@ final class MetricStorageUtils { } /** - * Diffs accumulations from {@code toMerge} into {@code result}. + * Diffs accumulations from {@code toMerge} into {@code result}. Keys from {@code result} which + * don't appear in {@code toMerge} are removed. * *

If no prior value is found, then the value from {@code toDiff} is used. * @@ -35,6 +41,7 @@ final class MetricStorageUtils { */ static void diffInPlace( Map result, Map toDiff, Aggregator aggregator) { + result.entrySet().removeIf(entry -> !toDiff.containsKey(entry.getKey())); toDiff.forEach( (k, v) -> { result.compute(k, (k2, v2) -> (v2 != null) ? aggregator.diff(v2, v) : v); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/TemporalMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/TemporalMetricStorage.java index 9f6f6665cd..8cfc80375c 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/TemporalMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/TemporalMetricStorage.java @@ -107,17 +107,16 @@ class TemporalMetricStorage { /** Remembers what was presented to a specific exporter. */ private static class LastReportedAccumulation { - @Nullable private final Map accumulation; + private final Map accumulation; private final long epochNanos; /** * Constructs a new reporting record. * - * @param accumulation The last accumulation of metric data or {@code null} if the accumulator - * is not stateful. + * @param accumulation The last accumulation of metric data. * @param epochNanos The timestamp the data was reported. */ - LastReportedAccumulation(@Nullable Map accumulation, long epochNanos) { + LastReportedAccumulation(Map accumulation, long epochNanos) { this.accumulation = accumulation; this.epochNanos = epochNanos; } @@ -126,7 +125,6 @@ class TemporalMetricStorage { return epochNanos; } - @Nullable Map getAccumulation() { return accumulation; } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/CardinalityTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/CardinalityTest.java new file mode 100644 index 0000000000..11d7d06159 --- /dev/null +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/CardinalityTest.java @@ -0,0 +1,228 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics; + +import static io.opentelemetry.sdk.testing.assertj.metrics.MetricAssertions.assertThat; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import io.opentelemetry.sdk.metrics.testing.InMemoryMetricReader; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class CardinalityTest { + + /** Traces {@code MetricStorageUtils#MAX_ACCUMULATIONS}. */ + private static final int MAX_ACCUMULATIONS = 2000; + + private InMemoryMetricReader deltaReader; + private InMemoryMetricReader cumulativeReader; + private Meter meter; + + @BeforeEach + void setup() { + deltaReader = InMemoryMetricReader.createDelta(); + cumulativeReader = InMemoryMetricReader.create(); + SdkMeterProvider sdkMeterProvider = + SdkMeterProvider.builder() + .registerMetricReader(deltaReader) + .registerMetricReader(cumulativeReader) + .setMinimumCollectionInterval(Duration.ofSeconds(0)) + .build(); + meter = sdkMeterProvider.get(CardinalityTest.class.getName()); + } + + /** + * Records to sync instruments, with distinct attributes each time. Validates that stale metrics + * are dropped for delta and cumulative readers. Stale metrics are those with attributes that did + * not receive recordings in the most recent collection. + */ + @Test + void staleMetricsDropped_synchronousInstrument() { + LongCounter syncCounter = meter.counterBuilder("sync-counter").build(); + for (int i = 1; i <= 5; i++) { + syncCounter.add(1, Attributes.builder().put("key", "num_" + i).build()); + + assertThat(deltaReader.collectAllMetrics()) + .as("Delta collection " + i) + .hasSize(1) + .satisfiesExactly( + metricData -> + assertThat(metricData) + .hasName("sync-counter") + .hasLongSum() + .isDelta() + .points() + .hasSize(1)); + + assertThat(cumulativeReader.collectAllMetrics()) + .as("Cumulative collection " + i) + .hasSize(1) + .satisfiesExactly( + metricData -> + assertThat(metricData) + .hasName("sync-counter") + .hasLongSum() + .isCumulative() + .points() + .hasSize(1)); + } + } + + /** + * Records to async instruments, with distinct attributes each time. Validates that stale metrics + * are dropped for delta and cumulative readers. Stale metrics are those with attributes that did + * not receive recordings in the most recent collection. + */ + @Test + void staleMetricsDropped_asynchronousInstrument() { + AtomicLong count = new AtomicLong(); + + meter + .counterBuilder("async-counter") + .buildWithCallback( + measurement -> + measurement.observe( + 1, Attributes.builder().put("key", "num_" + count.incrementAndGet()).build())); + + for (int i = 1; i <= 5; i++) { + assertThat(deltaReader.collectAllMetrics()) + .as("Delta collection " + i) + .hasSize(1) + .satisfiesExactlyInAnyOrder( + metricData -> + assertThat(metricData) + .hasName("async-counter") + .hasLongSum() + .isDelta() + .points() + .hasSize(1)); + + assertThat(cumulativeReader.collectAllMetrics()) + .as("Cumulative collection " + i) + .hasSize(1) + .satisfiesExactlyInAnyOrder( + metricData -> + assertThat(metricData) + .hasName("async-counter") + .hasLongSum() + .isCumulative() + .points() + .hasSize(1)); + } + } + + /** + * Records to sync instruments, many distinct attributes. Validates that the {@code + * MetricStorageUtils#MAX_ACCUMULATIONS} is enforced for each instrument. + */ + @Test + void cardinalityLimits_synchronousInstrument() { + LongCounter syncCounter1 = meter.counterBuilder("sync-counter1").build(); + LongCounter syncCounter2 = meter.counterBuilder("sync-counter2").build(); + for (int i = 0; i < MAX_ACCUMULATIONS + 1; i++) { + syncCounter1.add(1, Attributes.builder().put("key", "value" + i).build()); + syncCounter2.add(1, Attributes.builder().put("key", "value" + i).build()); + } + + assertThat(deltaReader.collectAllMetrics()) + .as("Delta collection") + .hasSize(2) + .satisfiesExactlyInAnyOrder( + metricData -> + assertThat(metricData) + .hasName("sync-counter1") + .hasLongSum() + .isDelta() + .points() + .hasSize(MAX_ACCUMULATIONS), + metricData -> + assertThat(metricData) + .hasName("sync-counter2") + .hasLongSum() + .isDelta() + .points() + .hasSize(MAX_ACCUMULATIONS)); + + assertThat(cumulativeReader.collectAllMetrics()) + .as("Cumulative collection") + .hasSize(2) + .satisfiesExactlyInAnyOrder( + metricData -> + assertThat(metricData) + .hasName("sync-counter1") + .hasLongSum() + .isCumulative() + .points() + .hasSize(MAX_ACCUMULATIONS), + metricData -> + assertThat(metricData) + .hasName("sync-counter2") + .hasLongSum() + .isCumulative() + .points() + .hasSize(MAX_ACCUMULATIONS)); + } + + /** + * Records to sync instruments, many distinct attributes. Validates that the {@code + * MetricStorageUtils#MAX_ACCUMULATIONS} is enforced for each instrument. + */ + @Test + void cardinalityLimits_asynchronousInstrument() { + Consumer callback = + measurement -> { + for (int i = 0; i < MAX_ACCUMULATIONS + 1; i++) { + measurement.observe(1, Attributes.builder().put("key", "value" + i).build()); + } + }; + meter.counterBuilder("async-counter1").buildWithCallback(callback); + meter.counterBuilder("async-counter2").buildWithCallback(callback); + + assertThat(deltaReader.collectAllMetrics()) + .as("Delta collection") + .hasSize(2) + .satisfiesExactlyInAnyOrder( + metricData -> + assertThat(metricData) + .hasName("async-counter1") + .hasLongSum() + .isDelta() + .points() + .hasSize(MAX_ACCUMULATIONS), + metricData -> + assertThat(metricData) + .hasName("async-counter2") + .hasLongSum() + .isDelta() + .points() + .hasSize(MAX_ACCUMULATIONS)); + + assertThat(cumulativeReader.collectAllMetrics()) + .as("Cumulative collection") + .hasSize(2) + .satisfiesExactlyInAnyOrder( + metricData -> + assertThat(metricData) + .hasName("async-counter1") + .hasLongSum() + .isCumulative() + .points() + .hasSize(MAX_ACCUMULATIONS), + metricData -> + assertThat(metricData) + .hasName("async-counter2") + .hasLongSum() + .isCumulative() + .points() + .hasSize(MAX_ACCUMULATIONS)); + } +} diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/DeltaMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/DeltaMetricStorageTest.java index ec590621cf..abeb37b916 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/DeltaMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/DeltaMetricStorageTest.java @@ -41,7 +41,8 @@ class DeltaMetricStorageTest { allCollectors.add(collector2); storage = new DeltaMetricStorage<>( - Aggregation.sum().createAggregator(DESCRIPTOR, ExemplarFilter.neverSample())); + Aggregation.sum().createAggregator(DESCRIPTOR, ExemplarFilter.neverSample()), + DESCRIPTOR); } @Test diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/TemporalMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/TemporalMetricStorageTest.java index c21d5d2f6d..76bda59ec8 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/TemporalMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/TemporalMetricStorageTest.java @@ -12,6 +12,7 @@ import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; import io.opentelemetry.sdk.metrics.common.InstrumentType; import io.opentelemetry.sdk.metrics.common.InstrumentValueType; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.DoublePointData; import io.opentelemetry.sdk.metrics.exemplar.ExemplarFilter; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; import io.opentelemetry.sdk.metrics.internal.aggregator.DoubleAccumulation; @@ -137,6 +138,104 @@ class TemporalMetricStorageTest { point -> assertThat(point).hasStartEpochNanos(0).hasEpochNanos(35).hasValue(8)); } + @Test + void synchronousCumulative_dropsStale() { + TemporalMetricStorage storage = + new TemporalMetricStorage<>(SUM, /* isSynchronous= */ true); + + // Send in new measurement at time 10 for collector 1, with attr1 + Map measurement1 = new HashMap<>(); + Attributes attr1 = Attributes.builder().put("key", "value1").build(); + measurement1.put(attr1, DoubleAccumulation.create(3)); + assertThat( + storage.buildMetricFor( + collector1, + Resource.empty(), + InstrumentationLibraryInfo.empty(), + METRIC_DESCRIPTOR, + AggregationTemporality.CUMULATIVE, + measurement1, + 0, + 10)) + .hasDoubleSum() + .isCumulative() + .points() + .hasSize(1) + .isNotEmpty() + .contains(DoublePointData.create(0, 10, attr1, 3)); + + // Send in new measurement at time 20 for collector 1, with attr2 + // Result should drop accumulation for attr1, only reporting accumulation for attr2 + Map measurement2 = new HashMap<>(); + Attributes attr2 = Attributes.builder().put("key", "value2").build(); + measurement2.put(attr2, DoubleAccumulation.create(7)); + assertThat( + storage.buildMetricFor( + collector1, + Resource.empty(), + InstrumentationLibraryInfo.empty(), + METRIC_DESCRIPTOR, + AggregationTemporality.CUMULATIVE, + measurement2, + 0, + 20)) + .hasDoubleSum() + .isCumulative() + .points() + .hasSize(1) + .isNotEmpty() + .containsExactly(DoublePointData.create(0, 20, attr2, 7)); + } + + @Test + void synchronousDelta_dropsStale() { + TemporalMetricStorage storage = + new TemporalMetricStorage<>(SUM, /* isSynchronous= */ true); + + // Send in new measurement at time 10 for collector 1, with attr1 + Map measurement1 = new HashMap<>(); + Attributes attr1 = Attributes.builder().put("key", "value1").build(); + measurement1.put(attr1, DoubleAccumulation.create(3)); + assertThat( + storage.buildMetricFor( + collector1, + Resource.empty(), + InstrumentationLibraryInfo.empty(), + METRIC_DESCRIPTOR, + AggregationTemporality.DELTA, + measurement1, + 0, + 10)) + .hasDoubleSum() + .isDelta() + .points() + .hasSize(1) + .isNotEmpty() + .contains(DoublePointData.create(0, 10, attr1, 3)); + + // Send in new measurement at time 20 for collector 1, with attr2 + // Result should drop accumulation for attr1, only reporting accumulation for attr2 + Map measurement2 = new HashMap<>(); + Attributes attr2 = Attributes.builder().put("key", "value2").build(); + measurement2.put(attr2, DoubleAccumulation.create(7)); + assertThat( + storage.buildMetricFor( + collector1, + Resource.empty(), + InstrumentationLibraryInfo.empty(), + METRIC_DESCRIPTOR, + AggregationTemporality.DELTA, + measurement2, + 0, + 20)) + .hasDoubleSum() + .isDelta() + .points() + .hasSize(1) + .isNotEmpty() + .containsExactly(DoublePointData.create(10, 20, attr2, 7)); + } + @Test void synchronousDelta_useLastTimestamp() { AggregationTemporality temporality = AggregationTemporality.DELTA; @@ -378,6 +477,104 @@ class TemporalMetricStorageTest { point -> assertThat(point).hasStartEpochNanos(0).hasEpochNanos(35).hasValue(2)); } + @Test + void asynchronousCumulative_dropsStale() { + TemporalMetricStorage storage = + new TemporalMetricStorage<>(ASYNC_SUM, /* isSynchronous= */ false); + + // Send in new measurement at time 10 for collector 1, with attr1 + Map measurement1 = new HashMap<>(); + Attributes attr1 = Attributes.builder().put("key", "value1").build(); + measurement1.put(attr1, DoubleAccumulation.create(3)); + assertThat( + storage.buildMetricFor( + collector1, + Resource.empty(), + InstrumentationLibraryInfo.empty(), + METRIC_DESCRIPTOR, + AggregationTemporality.CUMULATIVE, + measurement1, + 0, + 10)) + .hasDoubleSum() + .isCumulative() + .points() + .hasSize(1) + .isNotEmpty() + .contains(DoublePointData.create(0, 10, attr1, 3)); + + // Send in new measurement at time 20 for collector 1, with attr2 + // Result should drop accumulation for attr1, only reporting accumulation for attr2 + Map measurement2 = new HashMap<>(); + Attributes attr2 = Attributes.builder().put("key", "value2").build(); + measurement2.put(attr2, DoubleAccumulation.create(7)); + assertThat( + storage.buildMetricFor( + collector1, + Resource.empty(), + InstrumentationLibraryInfo.empty(), + METRIC_DESCRIPTOR, + AggregationTemporality.CUMULATIVE, + measurement2, + 0, + 20)) + .hasDoubleSum() + .isCumulative() + .points() + .hasSize(1) + .isNotEmpty() + .containsExactly(DoublePointData.create(0, 20, attr2, 7)); + } + + @Test + void asynchronousDelta_dropsStale() { + TemporalMetricStorage storage = + new TemporalMetricStorage<>(ASYNC_SUM, /* isSynchronous= */ false); + + // Send in new measurement at time 10 for collector 1, with attr1 + Map measurement1 = new HashMap<>(); + Attributes attr1 = Attributes.builder().put("key", "value1").build(); + measurement1.put(attr1, DoubleAccumulation.create(3)); + assertThat( + storage.buildMetricFor( + collector1, + Resource.empty(), + InstrumentationLibraryInfo.empty(), + METRIC_DESCRIPTOR, + AggregationTemporality.DELTA, + measurement1, + 0, + 10)) + .hasDoubleSum() + .isDelta() + .points() + .hasSize(1) + .isNotEmpty() + .contains(DoublePointData.create(0, 10, attr1, 3)); + + // Send in new measurement at time 20 for collector 1, with attr2 + // Result should drop accumulation for attr1, only reporting accumulation for attr2 + Map measurement2 = new HashMap<>(); + Attributes attr2 = Attributes.builder().put("key", "value2").build(); + measurement2.put(attr2, DoubleAccumulation.create(7)); + assertThat( + storage.buildMetricFor( + collector1, + Resource.empty(), + InstrumentationLibraryInfo.empty(), + METRIC_DESCRIPTOR, + AggregationTemporality.DELTA, + measurement2, + 0, + 20)) + .hasDoubleSum() + .isDelta() + .points() + .hasSize(1) + .isNotEmpty() + .containsExactly(DoublePointData.create(10, 20, attr2, 7)); + } + @Test void asynchronousDelta_diffsLastTimestamp() { AggregationTemporality temporality = AggregationTemporality.DELTA; diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/testing/InMemoryMetricReaderCumulativeTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/testing/InMemoryMetricReaderCumulativeTest.java index 4f5eca36f4..6a34c56a59 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/testing/InMemoryMetricReaderCumulativeTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/testing/InMemoryMetricReaderCumulativeTest.java @@ -8,6 +8,7 @@ package io.opentelemetry.sdk.metrics.testing; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import java.time.Duration; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -19,7 +20,11 @@ class InMemoryMetricReaderCumulativeTest { @BeforeEach void setup() { reader = InMemoryMetricReader.create(); - provider = SdkMeterProvider.builder().registerMetricReader(reader).build(); + provider = + SdkMeterProvider.builder() + .setMinimumCollectionInterval(Duration.ofSeconds(0)) + .registerMetricReader(reader) + .build(); } private void generateFakeMetric(int index) { @@ -45,7 +50,7 @@ class InMemoryMetricReaderCumulativeTest { // Add more data, should join. generateFakeMetric(1); - assertThat(reader.collectAllMetrics()).hasSize(3); + assertThat(reader.collectAllMetrics()).hasSize(1); } @Test @@ -55,7 +60,7 @@ class InMemoryMetricReaderCumulativeTest { generateFakeMetric(3); // TODO: Better assertions for CompletableResultCode. assertThat(reader.flush()).isNotNull(); - assertThat(reader.collectAllMetrics()).hasSize(3); + assertThat(reader.collectAllMetrics()).hasSize(0); } @Test