Memory Mode: Adding first part support for synchronous instruments - storage (#5998)

Co-authored-by: jack-berg <34418638+jack-berg@users.noreply.github.com>
This commit is contained in:
Asaf Mesika 2023-12-15 00:21:39 +02:00 committed by GitHub
parent 6e536238df
commit ffd53c7d56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 532 additions and 50 deletions

View File

@ -17,6 +17,21 @@ public enum MemoryMode {
*
* <p>In this mode, the SDK reuses objects to reduce allocations, at the expense of disallowing
* concurrent collections / exports.
*
* <p>Metric Signal: For DELTA aggregation temporality, the memory used for recording and
* aggregating metric values is kept between MetricReader collect operation, to avoid memory
* allocations. When the configured maximum cardinality of Attributes is reached, unused
* Attributes are cleared from memory during collect operation, at the cost of requiring new
* memory allocations the next time those attributes are used. Allocations can be minimized by
* increasing the configured max cardinality. For example, suppose instrumentation has recorded
* values for 1000 unique Attributes while the max cardinality configured was 2000. If after a
* collection only 100 unique Attributes values are recorded, the MetricReader's collect operation
* would return 100 points, while in memory the Attributes data structure keeps 1000 unique
* Attributes. If a user recorded values for 3000 unique attributes, the values for the first 1999
* Attributes would be recorded, and the rest of 1001 unique Attributes values would be recorded
* in the CARDINALITY_OVERFLOW Attributes. If after several collect operations, the user now
* records values to only 500 unique attributes, during collect operation, the unused 1500
* Attributes memory would be cleared from memory.
*/
REUSABLE_DATA,
@ -25,6 +40,9 @@ public enum MemoryMode {
*
* <p>In this mode, the SDK passes immutable objects to exporters / readers, increasing
* allocations but ensuring safe concurrent exports.
*
* <p>Metric Signal: In DELTA aggregation temporality, the memory used for recording and
* aggregating Attributes values is cleared during a MetricReader collect operation.
*/
IMMUTABLE_DATA
}

View File

@ -7,6 +7,7 @@ package io.opentelemetry.sdk.metrics;
import static io.opentelemetry.sdk.metrics.internal.view.AttributesProcessor.setIncludes;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.internal.state.MetricStorage;
@ -96,6 +97,7 @@ public final class ViewBuilder {
* <p>Note: not currently stable but additional attribute processors can be configured via {@link
* SdkMeterProviderUtil#appendAllBaggageAttributes(ViewBuilder)}.
*/
@SuppressWarnings("unused")
ViewBuilder addAttributesProcessor(AttributesProcessor attributesProcessor) {
this.processor = this.processor.then(attributesProcessor);
return this;
@ -105,7 +107,10 @@ public final class ViewBuilder {
* Set the cardinality limit.
*
* <p>Note: not currently stable but cardinality limit can be configured via
* SdkMeterProviderUtil#setCardinalityLimit(ViewBuilder, int)}.
* SdkMeterProviderUtil#setCardinalityLimit(ViewBuilder, int).
*
* <p>Read {@link MemoryMode} to understand the memory usage behavior of reaching cardinality
* limit.
*
* @param cardinalityLimit the maximum number of series for a metric
*/

View File

@ -28,6 +28,7 @@ public abstract class AggregatorHandle<T extends PointData, U extends ExemplarDa
// A reservoir of sampled exemplars for this time period.
private final ExemplarReservoir<U> exemplarReservoir;
private volatile boolean valuesRecorded = false;
protected AggregatorHandle(ExemplarReservoir<U> exemplarReservoir) {
this.exemplarReservoir = exemplarReservoir;
@ -39,6 +40,10 @@ public abstract class AggregatorHandle<T extends PointData, U extends ExemplarDa
*/
public final T aggregateThenMaybeReset(
long startEpochNanos, long epochNanos, Attributes attributes, boolean reset) {
if (reset) {
valuesRecorded = false;
}
return doAggregateThenMaybeReset(
startEpochNanos,
epochNanos,
@ -69,6 +74,7 @@ public abstract class AggregatorHandle<T extends PointData, U extends ExemplarDa
*/
public final void recordLong(long value) {
doRecordLong(value);
valuesRecorded = true;
}
/**
@ -94,6 +100,7 @@ public abstract class AggregatorHandle<T extends PointData, U extends ExemplarDa
*/
public final void recordDouble(double value) {
doRecordDouble(value);
valuesRecorded = true;
}
/**
@ -104,4 +111,13 @@ public abstract class AggregatorHandle<T extends PointData, U extends ExemplarDa
throw new UnsupportedOperationException(
"This aggregator does not support recording double values.");
}
/**
* Checks whether this handle has values recorded.
*
* @return True if values has been recorded to it
*/
public boolean hasRecordedValues() {
return valuesRecorded;
}
}

View File

@ -5,9 +5,14 @@
package io.opentelemetry.sdk.metrics.internal.state;
import static io.opentelemetry.sdk.common.export.MemoryMode.IMMUTABLE_DATA;
import static io.opentelemetry.sdk.common.export.MemoryMode.REUSABLE_DATA;
import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.DELTA;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
@ -50,6 +55,16 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
private volatile AggregatorHolder<T, U> aggregatorHolder = new AggregatorHolder<>();
private final AttributesProcessor attributesProcessor;
private final MemoryMode memoryMode;
// Only populated if memoryMode == REUSABLE_DATA
private final ArrayList<T> reusableResultList = new ArrayList<>();
// Only populated if memoryMode == REUSABLE_DATA and
// aggregationTemporality is DELTA
private volatile ConcurrentHashMap<Attributes, AggregatorHandle<T, U>>
previousCollectionAggregatorHandles = new ConcurrentHashMap<>();
/**
* This field is set to 1 less than the actual intended cardinality limit, allowing the last slot
* to be filled by the {@link MetricStorage#CARDINALITY_OVERFLOW} series.
@ -74,6 +89,7 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
this.aggregator = aggregator;
this.attributesProcessor = attributesProcessor;
this.maxCardinality = maxCardinality - 1;
this.memoryMode = registeredReader.getReader().getMemoryMode();
}
// Visible for testing
@ -139,7 +155,7 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
/**
* Called on the {@link AggregatorHolder} obtained from {@link #getHolderForRecord()} to indicate
* that recording is complete and it is safe to collect.
* that recording is complete, and it is safe to collect.
*/
private void releaseHolderForRecord(AggregatorHolder<T, U> aggregatorHolder) {
aggregatorHolder.activeRecordingThreads.addAndGet(-2);
@ -185,16 +201,20 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
InstrumentationScopeInfo instrumentationScopeInfo,
long startEpochNanos,
long epochNanos) {
boolean reset = aggregationTemporality == AggregationTemporality.DELTA;
boolean reset = aggregationTemporality == DELTA;
long start =
aggregationTemporality == AggregationTemporality.DELTA
aggregationTemporality == DELTA
? registeredReader.getLastCollectEpochNanos()
: startEpochNanos;
ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles;
if (reset) {
AggregatorHolder<T, U> holder = this.aggregatorHolder;
this.aggregatorHolder = new AggregatorHolder<>();
this.aggregatorHolder =
(memoryMode == REUSABLE_DATA)
? new AggregatorHolder<>(previousCollectionAggregatorHandles)
: new AggregatorHolder<>();
// Increment recordsInProgress by 1, which produces an odd number acting as a signal that
// record operations should re-read the volatile this.aggregatorHolder.
// Repeatedly grab recordsInProgress until it is <= 1, which signals all active record
@ -208,15 +228,56 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
aggregatorHandles = this.aggregatorHolder.aggregatorHandles;
}
List<T> points;
if (memoryMode == REUSABLE_DATA) {
reusableResultList.clear();
points = reusableResultList;
} else {
points = new ArrayList<>(aggregatorHandles.size());
}
// In DELTA aggregation temporality each Attributes is reset to 0
// every time we perform a collection (by definition of DELTA).
// In IMMUTABLE_DATA MemoryMode, this is accomplished by removing all aggregator handles
// (into which the values are recorded) effectively starting from 0
// for each recorded Attributes.
// In REUSABLE_DATA MemoryMode, we strive for zero allocations. Since even removing
// a key-value from a map and putting it again on next recording will cost an allocation,
// we are keeping the aggregator handles in their map, and only reset their value once
// we finish collecting the aggregated value from each one.
// The SDK must adhere to keeping no more than maxCardinality unique Attributes in memory,
// hence during collect(), when the map is at full capacity, we try to clear away unused
// aggregator handles, so on next recording cycle using this map, there will be room for newly
// recorded Attributes. This comes at the expanse of memory allocations. This can be avoided
// if the user chooses to increase the maxCardinality.
if (memoryMode == REUSABLE_DATA && reset) {
if (aggregatorHandles.size() >= maxCardinality) {
aggregatorHandles.forEach(
(attribute, handle) -> {
if (!handle.hasRecordedValues()) {
aggregatorHandles.remove(attribute);
}
});
}
}
// Grab aggregated points.
List<T> points = new ArrayList<>(aggregatorHandles.size());
aggregatorHandles.forEach(
(attributes, handle) -> {
if (!handle.hasRecordedValues()) {
return;
}
T point = handle.aggregateThenMaybeReset(start, epochNanos, attributes, reset);
if (reset) {
if (reset && memoryMode == IMMUTABLE_DATA) {
// Return the aggregator to the pool.
// The pool is only used in DELTA temporality (since in CUMULATIVE the handler is
// always used as it is the place accumulating the values and never resets)
// AND only in IMMUTABLE_DATA memory mode since in REUSABLE_DATA we avoid
// using the pool since it allocates memory internally on each put() or remove()
aggregatorHandlePool.offer(handle);
}
if (point != null) {
points.add(point);
}
@ -229,6 +290,10 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
aggregatorHandlePool.poll();
}
if (reset && memoryMode == REUSABLE_DATA) {
previousCollectionAggregatorHandles = aggregatorHandles;
}
if (points.isEmpty()) {
return EmptyMetricData.getInstance();
}
@ -243,8 +308,7 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
}
private static class AggregatorHolder<T extends PointData, U extends ExemplarData> {
private final ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles;
// Recording threads grab the current interval (AggregatorHolder) and atomically increment
// this by 2 before recording against it (and then decrement by two when done).
//
@ -260,5 +324,14 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
// all it needs to do is release the "read lock" it just obtained (decrementing by 2),
// and then grab and record against the new current interval (AggregatorHolder).
private final AtomicInteger activeRecordingThreads = new AtomicInteger(0);
private AggregatorHolder() {
aggregatorHandles = new ConcurrentHashMap<>();
}
private AggregatorHolder(
ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles) {
this.aggregatorHandles = aggregatorHandles;
}
}
}

View File

@ -20,9 +20,11 @@ import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.InstrumentValueType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.LongExemplarData;
import io.opentelemetry.sdk.metrics.data.LongPointData;
@ -39,6 +41,7 @@ import io.opentelemetry.sdk.metrics.internal.export.RegisteredReader;
import io.opentelemetry.sdk.metrics.internal.view.AttributesProcessor;
import io.opentelemetry.sdk.metrics.internal.view.ViewRegistry;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.assertj.DoubleSumAssert;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
import io.opentelemetry.sdk.testing.time.TestClock;
import java.time.Duration;
@ -47,10 +50,12 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiConsumer;
import java.util.stream.Stream;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.event.Level;
@ -75,19 +80,36 @@ public class SynchronousMetricStorageTest {
LogCapturer logs =
LogCapturer.create().captureForType(DefaultSynchronousMetricStorage.class, Level.DEBUG);
private final RegisteredReader deltaReader =
RegisteredReader.create(InMemoryMetricReader.createDelta(), ViewRegistry.create());
private final RegisteredReader cumulativeReader =
RegisteredReader.create(InMemoryMetricReader.create(), ViewRegistry.create());
private RegisteredReader deltaReader;
private RegisteredReader cumulativeReader;
private final TestClock testClock = TestClock.create();
private final Aggregator<LongPointData, LongExemplarData> aggregator =
spy(
((AggregatorFactory) Aggregation.sum())
.createAggregator(DESCRIPTOR, ExemplarFilter.alwaysOff()));
private Aggregator<LongPointData, LongExemplarData> aggregator;
private final AttributesProcessor attributesProcessor = AttributesProcessor.noop();
@Test
void recordDouble_NaN() {
private void initialize(MemoryMode memoryMode) {
deltaReader =
RegisteredReader.create(
InMemoryMetricReader.builder()
.setAggregationTemporalitySelector(unused -> AggregationTemporality.DELTA)
.setMemoryMode(memoryMode)
.build(),
ViewRegistry.create());
cumulativeReader =
RegisteredReader.create(
InMemoryMetricReader.builder().setMemoryMode(memoryMode).build(),
ViewRegistry.create());
aggregator =
spy(
((AggregatorFactory) Aggregation.sum())
.createAggregator(DESCRIPTOR, ExemplarFilter.alwaysOff()));
}
@ParameterizedTest
@EnumSource(MemoryMode.class)
void recordDouble_NaN(MemoryMode memoryMode) {
initialize(memoryMode);
DefaultSynchronousMetricStorage<?, ?> storage =
new DefaultSynchronousMetricStorage<>(
cumulativeReader,
@ -105,8 +127,11 @@ public class SynchronousMetricStorageTest {
.isEqualTo(EmptyMetricData.getInstance());
}
@Test
void attributesProcessor_applied() {
@ParameterizedTest
@EnumSource(MemoryMode.class)
void attributesProcessor_applied(MemoryMode memoryMode) {
initialize(memoryMode);
Attributes attributes = Attributes.builder().put("K", "V").build();
AttributesProcessor attributesProcessor =
AttributesProcessor.append(Attributes.builder().put("modifiedK", "modifiedV").build());
@ -129,8 +154,11 @@ public class SynchronousMetricStorageTest {
attributeEntry("K", "V"), attributeEntry("modifiedK", "modifiedV"))));
}
@Test
void recordAndCollect_CumulativeDoesNotReset() {
@ParameterizedTest
@EnumSource(MemoryMode.class)
void recordAndCollect_CumulativeDoesNotReset(MemoryMode memoryMode) {
initialize(memoryMode);
DefaultSynchronousMetricStorage<?, ?> storage =
new DefaultSynchronousMetricStorage<>(
cumulativeReader,
@ -176,7 +204,9 @@ public class SynchronousMetricStorageTest {
}
@Test
void recordAndCollect_DeltaResets() {
void recordAndCollect_DeltaResets_ImmutableData() {
initialize(MemoryMode.IMMUTABLE_DATA);
DefaultSynchronousMetricStorage<?, ?> storage =
new DefaultSynchronousMetricStorage<>(
deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor, CARDINALITY_LIMIT);
@ -223,7 +253,107 @@ public class SynchronousMetricStorageTest {
}
@Test
void recordAndCollect_CumulativeAtLimit() {
void recordAndCollect_DeltaResets_ReusableData() {
initialize(MemoryMode.REUSABLE_DATA);
DefaultSynchronousMetricStorage<?, ?> storage =
new DefaultSynchronousMetricStorage<>(
deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor, CARDINALITY_LIMIT);
// Record measurement and collect at time 10
storage.recordDouble(3, Attributes.empty(), Context.current());
verify(aggregator, times(1)).createHandle();
assertThat(storage.getAggregatorHandlePool()).hasSize(0);
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10))
.hasDoubleSumSatisfying(
sum ->
sum.isDelta()
.hasPointsSatisfying(
point -> point.hasStartEpochNanos(0).hasEpochNanos(10).hasValue(3)));
assertThat(storage.getAggregatorHandlePool()).hasSize(0);
deltaReader.setLastCollectEpochNanos(10);
// Record measurement and collect at time 30
storage.recordDouble(3, Attributes.empty(), Context.current());
// We're switched to secondary map so a handle will be created
verify(aggregator, times(2)).createHandle();
assertThat(storage.getAggregatorHandlePool()).hasSize(0);
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 30))
.hasDoubleSumSatisfying(
sum ->
sum.isDelta()
.hasPointsSatisfying(
point -> point.hasStartEpochNanos(10).hasEpochNanos(30).hasValue(3)));
assertThat(storage.getAggregatorHandlePool()).hasSize(0);
deltaReader.setLastCollectEpochNanos(30);
// Record measurements and collect at time 35
storage.recordDouble(2, Attributes.empty(), Context.current());
storage.recordDouble(4, Attributes.of(AttributeKey.stringKey("foo"), "bar"), Context.current());
// We don't delete aggregator handles unless max cardinality reached, hence
// aggregator handle is still there, thus no handle was created for empty(), but it will for
// the "foo"
verify(aggregator, times(3)).createHandle();
assertThat(storage.getAggregatorHandlePool()).hasSize(0);
MetricData metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 35);
assertThat(metricData).hasDoubleSumSatisfying(DoubleSumAssert::isDelta);
assertThat(metricData)
.hasDoubleSumSatisfying(
sum ->
sum.satisfies(
sumData ->
assertThat(sumData.getPoints())
.hasSize(2)
.anySatisfy(
point -> {
assertThat(point.getStartEpochNanos()).isEqualTo(30);
assertThat(point.getEpochNanos()).isEqualTo(35);
assertThat(point.getValue()).isEqualTo(2);
assertThat(point.getAttributes()).isEqualTo(Attributes.empty());
})
.anySatisfy(
point -> {
assertThat(point.getStartEpochNanos()).isEqualTo(30);
assertThat(point.getEpochNanos()).isEqualTo(35);
assertThat(point.getValue()).isEqualTo(4);
assertThat(point.getAttributes())
.isEqualTo(
Attributes.of(AttributeKey.stringKey("foo"), "bar"));
})));
assertThat(storage.getAggregatorHandlePool()).hasSize(0);
deltaReader.setLastCollectEpochNanos(40);
storage.recordDouble(6, Attributes.of(AttributeKey.stringKey("foo"), "bar"), Context.current());
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 45))
.hasDoubleSumSatisfying(
sum ->
sum.satisfies(
sumData ->
assertThat(sumData.getPoints())
.hasSize(1)
.allSatisfy(
point -> {
assertThat(point.getStartEpochNanos()).isEqualTo(40);
assertThat(point.getEpochNanos()).isEqualTo(45);
assertThat(point.getValue()).isEqualTo(6);
assertThat(point.getAttributes())
.isEqualTo(
Attributes.of(AttributeKey.stringKey("foo"), "bar"));
})));
}
@ParameterizedTest
@EnumSource(MemoryMode.class)
void recordAndCollect_CumulativeAtLimit(MemoryMode memoryMode) {
initialize(memoryMode);
DefaultSynchronousMetricStorage<?, ?> storage =
new DefaultSynchronousMetricStorage<>(
cumulativeReader,
@ -292,7 +422,9 @@ public class SynchronousMetricStorageTest {
}
@Test
void recordAndCollect_DeltaAtLimit() {
void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() {
initialize(MemoryMode.IMMUTABLE_DATA);
DefaultSynchronousMetricStorage<?, ?> storage =
new DefaultSynchronousMetricStorage<>(
deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor, CARDINALITY_LIMIT);
@ -319,6 +451,7 @@ public class SynchronousMetricStorageTest {
assertThat(point.getValue()).isEqualTo(3);
})));
assertThat(storage.getAggregatorHandlePool()).hasSize(CARDINALITY_LIMIT - 1);
assertThat(logs.getEvents()).isEmpty();
deltaReader.setLastCollectEpochNanos(10);
@ -384,6 +517,227 @@ public class SynchronousMetricStorageTest {
logs.assertContains("Instrument name has exceeded the maximum allowed cardinality");
}
@Test
void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode() {
initialize(MemoryMode.REUSABLE_DATA);
DefaultSynchronousMetricStorage<?, ?> storage =
new DefaultSynchronousMetricStorage<>(
deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor, CARDINALITY_LIMIT);
// Record measurements for CARDINALITY_LIMIT - 1, since 1 slot is reserved for the overflow
// series
for (int i = 0; i < CARDINALITY_LIMIT - 1; i++) {
storage.recordDouble(
3, Attributes.builder().put("key", "value" + i).build(), Context.current());
}
verify(aggregator, times(CARDINALITY_LIMIT - 1)).createHandle();
// First collect
MetricData metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10);
assertThat(metricData)
.hasDoubleSumSatisfying(
sum ->
sum.satisfies(
sumData ->
Assertions.assertThat(sumData.getPoints())
.hasSize(CARDINALITY_LIMIT - 1)
.allSatisfy(
point -> {
Assertions.assertThat(point.getStartEpochNanos()).isEqualTo(0);
Assertions.assertThat(point.getEpochNanos()).isEqualTo(10);
Assertions.assertThat(point.getValue()).isEqualTo(3);
})));
assertThat(logs.getEvents()).isEmpty();
deltaReader.setLastCollectEpochNanos(10);
// Record CARDINALITY_LIMIT measurements, causing one measurement to exceed the cardinality
// limit and fall into the overflow series
for (int i = 0; i < CARDINALITY_LIMIT; i++) {
storage.recordDouble(
3, Attributes.builder().put("key", "value" + i).build(), Context.current());
}
// After first collection, we expect the secondary map which is empty to be used,
// hence handle creation will still take place
// The +1 is for the overflow handle
verify(aggregator, times((CARDINALITY_LIMIT - 1) * 2 + 1)).createHandle();
// Second collect
metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 20);
assertThat(metricData)
.hasDoubleSumSatisfying(
sum ->
sum.satisfies(
sumData ->
assertThat(sumData.getPoints())
.hasSize(CARDINALITY_LIMIT)
.allSatisfy(
point -> {
assertThat(point.getStartEpochNanos()).isEqualTo(10);
assertThat(point.getEpochNanos()).isEqualTo(20);
assertThat(point.getValue()).isEqualTo(3);
})
.noneMatch(
point ->
("value" + CARDINALITY_LIMIT)
.equals(
point
.getAttributes()
.get(AttributeKey.stringKey("key"))))
.satisfiesOnlyOnce(
point ->
assertThat(point.getAttributes())
.isEqualTo(MetricStorage.CARDINALITY_OVERFLOW))));
assertThat(storage.getAggregatorHandlePool()).isEmpty();
logs.assertContains("Instrument name has exceeded the maximum allowed cardinality");
}
@Test
void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode_ExpireUnused() {
initialize(MemoryMode.REUSABLE_DATA);
DefaultSynchronousMetricStorage<?, ?> storage =
new DefaultSynchronousMetricStorage<>(
deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor, CARDINALITY_LIMIT);
// 1st recording: Recording goes to active map
for (int i = 0; i < CARDINALITY_LIMIT - 1; i++) {
storage.recordDouble(
3, Attributes.builder().put("key", "value" + i).build(), Context.current());
}
// This will switch next recordings to the secondary map (which is empty)
// by making it the active map
storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10);
// 2nd recording
deltaReader.setLastCollectEpochNanos(10);
for (int i = 0; i < CARDINALITY_LIMIT - 1; i++) {
storage.recordDouble(
3, Attributes.builder().put("key", "value" + i).build(), Context.current());
}
// This switches maps again, so next recordings will be to the first map
storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10, 20);
// 3rd recording: We're recording unseen attributes to a map we know is full,
// since it was filled during 1st recording
deltaReader.setLastCollectEpochNanos(20);
for (int i = CARDINALITY_LIMIT - 1; i < (CARDINALITY_LIMIT - 1) + 15; i++) {
storage.recordDouble(
3, Attributes.builder().put("key", "value" + i).build(), Context.current());
}
MetricData metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 20, 30);
assertOnlyOverflowWasRecorded(metricData, 20, 30, 15 * 3);
// 4th recording: We're recording unseen attributes to a map we know is full,
// since it was filled during *2nd* recording
deltaReader.setLastCollectEpochNanos(30);
for (int i = CARDINALITY_LIMIT - 1; i < (CARDINALITY_LIMIT - 1) + 15; i++) {
storage.recordDouble(
3, Attributes.builder().put("key", "value" + i).build(), Context.current());
}
metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 30, 40);
assertOnlyOverflowWasRecorded(metricData, 30, 40, 15 * 3);
// 5th recording: Map should be empty, since all handlers were removed due to
// no recording being done to them
deltaReader.setLastCollectEpochNanos(40);
for (int i = 0; i < 10; i++) {
storage.recordDouble(
3, Attributes.builder().put("key", "value" + i).build(), Context.current());
}
metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 40, 50);
assertNumberOfPoints(metricData, 10);
assertAllPointsWithValue(metricData, 40, 50, 3);
assertOverflowDoesNotExists(metricData);
// 6th recording: Map should be empty (we switched to secondary map), since all handlers
// were removed due to no recordings being done to them
deltaReader.setLastCollectEpochNanos(50);
for (int i = 0; i < 12; i++) {
storage.recordDouble(
4, Attributes.builder().put("key", "value" + i).build(), Context.current());
}
metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 50, 60);
assertNumberOfPoints(metricData, 12);
assertAllPointsWithValue(metricData, 50, 60, 4);
assertOverflowDoesNotExists(metricData);
}
@SuppressWarnings("SameParameterValue")
private static void assertOnlyOverflowWasRecorded(
MetricData metricData, long startTime, long endTime, double value) {
assertThat(metricData)
.hasDoubleSumSatisfying(
sum ->
sum.satisfies(
sumData ->
assertThat(sumData.getPoints())
.hasSize(1)
.allSatisfy(
point -> {
assertThat(point.getStartEpochNanos()).isEqualTo(startTime);
assertThat(point.getEpochNanos()).isEqualTo(endTime);
assertThat(point.getValue()).isEqualTo(value);
assertThat(point.getAttributes())
.isEqualTo(MetricStorage.CARDINALITY_OVERFLOW);
})));
}
private static void assertNumberOfPoints(MetricData metricData, int numberOfPoints) {
assertThat(metricData)
.hasDoubleSumSatisfying(
sum ->
sum.satisfies(sumData -> assertThat(sumData.getPoints()).hasSize(numberOfPoints)));
}
private static void assertAllPointsWithValue(
MetricData metricData, long startTime, long endTime, double value) {
assertThat(metricData)
.hasDoubleSumSatisfying(
sum ->
sum.satisfies(
sumData ->
assertThat(sumData.getPoints())
.allSatisfy(
point -> {
assertThat(point.getStartEpochNanos()).isEqualTo(startTime);
assertThat(point.getEpochNanos()).isEqualTo(endTime);
assertThat(point.getValue()).isEqualTo(value);
})));
}
private static void assertOverflowDoesNotExists(MetricData metricData) {
assertThat(metricData)
.hasDoubleSumSatisfying(
sum ->
sum.satisfies(
sumData ->
assertThat(sumData.getPoints())
.noneMatch(
point ->
point
.getAttributes()
.equals(MetricStorage.CARDINALITY_OVERFLOW))));
}
@ParameterizedTest
@MethodSource("concurrentStressTestArguments")
void recordAndCollect_concurrentStressTest(
@ -439,29 +793,45 @@ public class SynchronousMetricStorageTest {
}
private static Stream<Arguments> concurrentStressTestArguments() {
Aggregator<PointData, ExemplarData> aggregator =
((AggregatorFactory) Aggregation.sum())
.createAggregator(DESCRIPTOR, ExemplarFilter.alwaysOff());
return Stream.of(
Arguments.of(
// Delta
new DefaultSynchronousMetricStorage<>(
RegisteredReader.create(InMemoryMetricReader.createDelta(), ViewRegistry.create()),
METRIC_DESCRIPTOR,
aggregator,
AttributesProcessor.noop(),
CARDINALITY_LIMIT),
(BiConsumer<Double, AtomicDouble>)
(value, cumulativeCount) -> cumulativeCount.addAndGet(value)),
Arguments.of(
// Cumulative
new DefaultSynchronousMetricStorage<>(
RegisteredReader.create(InMemoryMetricReader.create(), ViewRegistry.create()),
METRIC_DESCRIPTOR,
aggregator,
AttributesProcessor.noop(),
CARDINALITY_LIMIT),
(BiConsumer<Double, AtomicDouble>)
(value, cumulativeCount) -> cumulativeCount.set(value)));
List<Arguments> argumentsList = new ArrayList<>();
for (MemoryMode memoryMode : MemoryMode.values()) {
Aggregator<PointData, ExemplarData> aggregator =
((AggregatorFactory) Aggregation.sum())
.createAggregator(DESCRIPTOR, ExemplarFilter.alwaysOff());
argumentsList.add(
Arguments.of(
// Delta
new DefaultSynchronousMetricStorage<>(
RegisteredReader.create(
InMemoryMetricReader.builder()
.setAggregationTemporalitySelector(unused -> AggregationTemporality.DELTA)
.setMemoryMode(memoryMode)
.build(),
ViewRegistry.create()),
METRIC_DESCRIPTOR,
aggregator,
AttributesProcessor.noop(),
CARDINALITY_LIMIT),
(BiConsumer<Double, AtomicDouble>)
(value, cumulativeCount) -> cumulativeCount.addAndGet(value)));
argumentsList.add(
Arguments.of(
// Cumulative
new DefaultSynchronousMetricStorage<>(
RegisteredReader.create(
InMemoryMetricReader.builder().setMemoryMode(memoryMode).build(),
ViewRegistry.create()),
METRIC_DESCRIPTOR,
aggregator,
AttributesProcessor.noop(),
CARDINALITY_LIMIT),
(BiConsumer<Double, AtomicDouble>)
(value, cumulativeCount) -> cumulativeCount.set(value)));
}
return argumentsList.stream();
}
}