Experimental metric reader and view cardinality limits (#5494)
This commit is contained in:
parent
4d034b08e8
commit
331c6af8d6
|
|
@ -421,7 +421,7 @@ class OpenTelemetrySdkTest {
|
|||
+ "clock=SystemClock{}, "
|
||||
+ "resource=Resource{schemaUrl=null, attributes={service.name=\"otel-test\"}}, "
|
||||
+ "metricReaders=[PeriodicMetricReader{exporter=MockMetricExporter{}, intervalNanos=60000000000}], "
|
||||
+ "views=[RegisteredView{instrumentSelector=InstrumentSelector{instrumentName=instrument}, view=View{name=new-instrument, aggregation=DefaultAggregation, attributesProcessor=NoopAttributesProcessor{}}}]"
|
||||
+ "views=[RegisteredView{instrumentSelector=InstrumentSelector{instrumentName=instrument}, view=View{name=new-instrument, aggregation=DefaultAggregation, attributesProcessor=NoopAttributesProcessor{}, cardinalityLimit=2000}}]"
|
||||
+ "}, "
|
||||
+ "loggerProvider=SdkLoggerProvider{"
|
||||
+ "clock=SystemClock{}, "
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ import io.opentelemetry.sdk.metrics.data.MetricData;
|
|||
import io.opentelemetry.sdk.metrics.export.MetricReader;
|
||||
import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
|
||||
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter;
|
||||
import io.opentelemetry.sdk.metrics.internal.export.CardinalityLimitSelector;
|
||||
import io.opentelemetry.sdk.metrics.internal.export.MetricProducer;
|
||||
import io.opentelemetry.sdk.metrics.internal.export.RegisteredReader;
|
||||
import io.opentelemetry.sdk.metrics.internal.state.MeterProviderSharedState;
|
||||
|
|
@ -26,6 +27,7 @@ import java.io.Closeable;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.IdentityHashMap;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
|
@ -54,17 +56,19 @@ public final class SdkMeterProvider implements MeterProvider, Closeable {
|
|||
|
||||
SdkMeterProvider(
|
||||
List<RegisteredView> registeredViews,
|
||||
List<MetricReader> metricReaders,
|
||||
IdentityHashMap<MetricReader, CardinalityLimitSelector> metricReaders,
|
||||
Clock clock,
|
||||
Resource resource,
|
||||
ExemplarFilter exemplarFilter) {
|
||||
long startEpochNanos = clock.now();
|
||||
this.registeredViews = registeredViews;
|
||||
this.registeredReaders =
|
||||
metricReaders.stream()
|
||||
metricReaders.entrySet().stream()
|
||||
.map(
|
||||
reader ->
|
||||
RegisteredReader.create(reader, ViewRegistry.create(reader, registeredViews)))
|
||||
entry ->
|
||||
RegisteredReader.create(
|
||||
entry.getKey(),
|
||||
ViewRegistry.create(entry.getKey(), entry.getValue(), registeredViews)))
|
||||
.collect(toList());
|
||||
this.sharedState =
|
||||
MeterProviderSharedState.create(clock, resource, exemplarFilter, startEpochNanos);
|
||||
|
|
|
|||
|
|
@ -10,9 +10,11 @@ import io.opentelemetry.sdk.metrics.export.MetricReader;
|
|||
import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
|
||||
import io.opentelemetry.sdk.metrics.internal.debug.SourceInfo;
|
||||
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter;
|
||||
import io.opentelemetry.sdk.metrics.internal.export.CardinalityLimitSelector;
|
||||
import io.opentelemetry.sdk.metrics.internal.view.RegisteredView;
|
||||
import io.opentelemetry.sdk.resources.Resource;
|
||||
import java.util.ArrayList;
|
||||
import java.util.IdentityHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
|
|
@ -32,7 +34,8 @@ public final class SdkMeterProviderBuilder {
|
|||
|
||||
private Clock clock = Clock.getDefault();
|
||||
private Resource resource = Resource.getDefault();
|
||||
private final List<MetricReader> metricReaders = new ArrayList<>();
|
||||
private final IdentityHashMap<MetricReader, CardinalityLimitSelector> metricReaders =
|
||||
new IdentityHashMap<>();
|
||||
private final List<RegisteredView> registeredViews = new ArrayList<>();
|
||||
private ExemplarFilter exemplarFilter = DEFAULT_EXEMPLAR_FILTER;
|
||||
|
||||
|
|
@ -96,7 +99,11 @@ public final class SdkMeterProviderBuilder {
|
|||
Objects.requireNonNull(view, "view");
|
||||
registeredViews.add(
|
||||
RegisteredView.create(
|
||||
selector, view, view.getAttributesProcessor(), SourceInfo.fromCurrentStack()));
|
||||
selector,
|
||||
view,
|
||||
view.getAttributesProcessor(),
|
||||
view.getCardinalityLimit(),
|
||||
SourceInfo.fromCurrentStack()));
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
@ -106,7 +113,20 @@ public final class SdkMeterProviderBuilder {
|
|||
* <p>Note: custom implementations of {@link MetricReader} are not currently supported.
|
||||
*/
|
||||
public SdkMeterProviderBuilder registerMetricReader(MetricReader reader) {
|
||||
metricReaders.add(reader);
|
||||
metricReaders.put(reader, CardinalityLimitSelector.defaultCardinalityLimitSelector());
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a {@link MetricReader} with a {@link CardinalityLimitSelector}.
|
||||
*
|
||||
* <p>Note: not currently stable but available for experimental use via {@link
|
||||
* SdkMeterProviderUtil#registerMetricReaderWithCardinalitySelector(SdkMeterProviderBuilder,
|
||||
* MetricReader, CardinalityLimitSelector)}.
|
||||
*/
|
||||
SdkMeterProviderBuilder registerMetricReader(
|
||||
MetricReader reader, CardinalityLimitSelector cardinalityLimitSelector) {
|
||||
metricReaders.put(reader, cardinalityLimitSelector);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -32,8 +32,10 @@ public abstract class View {
|
|||
@Nullable String name,
|
||||
@Nullable String description,
|
||||
Aggregation aggregation,
|
||||
AttributesProcessor attributesProcessor) {
|
||||
return new AutoValue_View(name, description, aggregation, attributesProcessor);
|
||||
AttributesProcessor attributesProcessor,
|
||||
int cardinalityLimit) {
|
||||
return new AutoValue_View(
|
||||
name, description, aggregation, attributesProcessor, cardinalityLimit);
|
||||
}
|
||||
|
||||
View() {}
|
||||
|
|
@ -58,6 +60,9 @@ public abstract class View {
|
|||
/** Returns the attribute processor used for this view. */
|
||||
abstract AttributesProcessor getAttributesProcessor();
|
||||
|
||||
/** Returns the cardinality limit for this view. */
|
||||
abstract int getCardinalityLimit();
|
||||
|
||||
@Override
|
||||
public final String toString() {
|
||||
StringJoiner joiner = new StringJoiner(", ", "View{", "}");
|
||||
|
|
@ -69,6 +74,7 @@ public abstract class View {
|
|||
}
|
||||
joiner.add("aggregation=" + getAggregation());
|
||||
joiner.add("attributesProcessor=" + getAttributesProcessor());
|
||||
joiner.add("cardinalityLimit=" + getCardinalityLimit());
|
||||
return joiner.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ package io.opentelemetry.sdk.metrics;
|
|||
|
||||
import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
|
||||
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
|
||||
import io.opentelemetry.sdk.metrics.internal.state.MetricStorage;
|
||||
import io.opentelemetry.sdk.metrics.internal.view.AttributesProcessor;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Predicate;
|
||||
|
|
@ -23,6 +24,7 @@ public final class ViewBuilder {
|
|||
@Nullable private String description;
|
||||
private Aggregation aggregation = Aggregation.defaultAggregation();
|
||||
private AttributesProcessor processor = AttributesProcessor.noop();
|
||||
private int cardinalityLimit = MetricStorage.DEFAULT_MAX_CARDINALITY;
|
||||
|
||||
ViewBuilder() {}
|
||||
|
||||
|
|
@ -85,8 +87,24 @@ public final class ViewBuilder {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the cardinality limit.
|
||||
*
|
||||
* <p>Note: not currently stable but cardinality limit can be configured via
|
||||
* SdkMeterProviderUtil#setCardinalityLimit(ViewBuilder, int)}.
|
||||
*
|
||||
* @param cardinalityLimit the maximum number of series for a metric
|
||||
*/
|
||||
ViewBuilder setCardinalityLimit(int cardinalityLimit) {
|
||||
if (cardinalityLimit <= 0) {
|
||||
throw new IllegalArgumentException("cardinalityLimit must be > 0");
|
||||
}
|
||||
this.cardinalityLimit = cardinalityLimit;
|
||||
return this;
|
||||
}
|
||||
|
||||
/** Returns a {@link View} with the configuration of this builder. */
|
||||
public View build() {
|
||||
return View.create(name, description, aggregation, processor);
|
||||
return View.create(name, description, aggregation, processor, cardinalityLimit);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,7 +8,9 @@ package io.opentelemetry.sdk.metrics.internal;
|
|||
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
|
||||
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
|
||||
import io.opentelemetry.sdk.metrics.ViewBuilder;
|
||||
import io.opentelemetry.sdk.metrics.export.MetricReader;
|
||||
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter;
|
||||
import io.opentelemetry.sdk.metrics.internal.export.CardinalityLimitSelector;
|
||||
import io.opentelemetry.sdk.metrics.internal.view.AttributesProcessor;
|
||||
import io.opentelemetry.sdk.metrics.internal.view.StringPredicates;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
|
|
@ -26,7 +28,7 @@ public final class SdkMeterProviderUtil {
|
|||
/**
|
||||
* Reflectively assign the {@link ExemplarFilter} to the {@link SdkMeterProviderBuilder}.
|
||||
*
|
||||
* @param sdkMeterProviderBuilder the
|
||||
* @param sdkMeterProviderBuilder the builder
|
||||
*/
|
||||
public static void setExemplarFilter(
|
||||
SdkMeterProviderBuilder sdkMeterProviderBuilder, ExemplarFilter exemplarFilter) {
|
||||
|
|
@ -42,6 +44,28 @@ public final class SdkMeterProviderUtil {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reflectively add a {@link MetricReader} with the {@link CardinalityLimitSelector} to the {@link
|
||||
* SdkMeterProviderBuilder}.
|
||||
*
|
||||
* @param sdkMeterProviderBuilder the builder
|
||||
*/
|
||||
public static void registerMetricReaderWithCardinalitySelector(
|
||||
SdkMeterProviderBuilder sdkMeterProviderBuilder,
|
||||
MetricReader metricReader,
|
||||
CardinalityLimitSelector cardinalityLimitSelector) {
|
||||
try {
|
||||
Method method =
|
||||
SdkMeterProviderBuilder.class.getDeclaredMethod(
|
||||
"registerMetricReader", MetricReader.class, CardinalityLimitSelector.class);
|
||||
method.setAccessible(true);
|
||||
method.invoke(sdkMeterProviderBuilder, metricReader, cardinalityLimitSelector);
|
||||
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
|
||||
throw new IllegalStateException(
|
||||
"Error calling addMetricReader on SdkMeterProviderBuilder", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reflectively add an {@link AttributesProcessor} to the {@link ViewBuilder} which appends
|
||||
* key-values from baggage to all measurements.
|
||||
|
|
@ -81,6 +105,21 @@ public final class SdkMeterProviderUtil {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reflectively set the {@code cardinalityLimit} on the {@link ViewBuilder}.
|
||||
*
|
||||
* @param viewBuilder the builder
|
||||
*/
|
||||
public static void setCardinalityLimit(ViewBuilder viewBuilder, int cardinalityLimit) {
|
||||
try {
|
||||
Method method = ViewBuilder.class.getDeclaredMethod("setCardinalityLimit", int.class);
|
||||
method.setAccessible(true);
|
||||
method.invoke(viewBuilder, cardinalityLimit);
|
||||
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
|
||||
throw new IllegalStateException("Error setting cardinalityLimit on ViewBuilder", e);
|
||||
}
|
||||
}
|
||||
|
||||
/** Reflectively reset the {@link SdkMeterProvider}, clearing all registered instruments. */
|
||||
public static void resetForTest(SdkMeterProvider sdkMeterProvider) {
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.sdk.metrics.internal.export;
|
||||
|
||||
import io.opentelemetry.sdk.metrics.InstrumentType;
|
||||
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
|
||||
import io.opentelemetry.sdk.metrics.export.MetricReader;
|
||||
import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
|
||||
import io.opentelemetry.sdk.metrics.internal.state.MetricStorage;
|
||||
|
||||
/**
|
||||
* Customize the {@link io.opentelemetry.sdk.metrics.export.MetricReader} cardinality limit as a
|
||||
* function of {@link InstrumentType}. Register via {@link
|
||||
* SdkMeterProviderUtil#registerMetricReaderWithCardinalitySelector(SdkMeterProviderBuilder,
|
||||
* MetricReader, CardinalityLimitSelector)}.
|
||||
*
|
||||
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
|
||||
* at any time.
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface CardinalityLimitSelector {
|
||||
|
||||
/**
|
||||
* The default {@link CardinalityLimitSelector}, allowing each metric to have {@code 2000} points.
|
||||
*/
|
||||
static CardinalityLimitSelector defaultCardinalityLimitSelector() {
|
||||
return unused -> MetricStorage.DEFAULT_MAX_CARDINALITY;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the default cardinality limit for metrics from instruments of type {@code
|
||||
* instrumentType}. The cardinality limit dictates the maximum number of distinct points (or time
|
||||
* series) for the metric.
|
||||
*/
|
||||
int getCardinalityLimit(InstrumentType instrumentType);
|
||||
}
|
||||
|
|
@ -46,6 +46,7 @@ final class AsynchronousMetricStorage<T extends PointData, U extends ExemplarDat
|
|||
private final AggregationTemporality aggregationTemporality;
|
||||
private final Aggregator<T, U> aggregator;
|
||||
private final AttributesProcessor attributesProcessor;
|
||||
private final int maxCardinality;
|
||||
private Map<Attributes, T> points = new HashMap<>();
|
||||
private Map<Attributes, T> lastPoints =
|
||||
new HashMap<>(); // Only populated if aggregationTemporality == DELTA
|
||||
|
|
@ -54,7 +55,8 @@ final class AsynchronousMetricStorage<T extends PointData, U extends ExemplarDat
|
|||
RegisteredReader registeredReader,
|
||||
MetricDescriptor metricDescriptor,
|
||||
Aggregator<T, U> aggregator,
|
||||
AttributesProcessor attributesProcessor) {
|
||||
AttributesProcessor attributesProcessor,
|
||||
int maxCardinality) {
|
||||
this.registeredReader = registeredReader;
|
||||
this.metricDescriptor = metricDescriptor;
|
||||
this.aggregationTemporality =
|
||||
|
|
@ -63,6 +65,7 @@ final class AsynchronousMetricStorage<T extends PointData, U extends ExemplarDat
|
|||
.getAggregationTemporality(metricDescriptor.getSourceInstrument().getType());
|
||||
this.aggregator = aggregator;
|
||||
this.attributesProcessor = attributesProcessor;
|
||||
this.maxCardinality = maxCardinality;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -83,7 +86,8 @@ final class AsynchronousMetricStorage<T extends PointData, U extends ExemplarDat
|
|||
registeredReader,
|
||||
metricDescriptor,
|
||||
aggregator,
|
||||
registeredView.getViewAttributesProcessor());
|
||||
registeredView.getViewAttributesProcessor(),
|
||||
registeredView.getCardinalityLimit());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -109,13 +113,13 @@ final class AsynchronousMetricStorage<T extends PointData, U extends ExemplarDat
|
|||
private void recordPoint(T point) {
|
||||
Attributes attributes = point.getAttributes();
|
||||
|
||||
if (points.size() >= MetricStorage.MAX_CARDINALITY) {
|
||||
if (points.size() >= maxCardinality) {
|
||||
throttlingLogger.log(
|
||||
Level.WARNING,
|
||||
"Instrument "
|
||||
+ metricDescriptor.getSourceInstrument().getName()
|
||||
+ " has exceeded the maximum allowed cardinality ("
|
||||
+ MetricStorage.MAX_CARDINALITY
|
||||
+ maxCardinality
|
||||
+ ").");
|
||||
return;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -50,6 +50,7 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
|
|||
private final ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles =
|
||||
new ConcurrentHashMap<>();
|
||||
private final AttributesProcessor attributesProcessor;
|
||||
private final int maxCardinality;
|
||||
private final ConcurrentLinkedQueue<AggregatorHandle<T, U>> aggregatorHandlePool =
|
||||
new ConcurrentLinkedQueue<>();
|
||||
|
||||
|
|
@ -57,7 +58,8 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
|
|||
RegisteredReader registeredReader,
|
||||
MetricDescriptor metricDescriptor,
|
||||
Aggregator<T, U> aggregator,
|
||||
AttributesProcessor attributesProcessor) {
|
||||
AttributesProcessor attributesProcessor,
|
||||
int maxCardinality) {
|
||||
this.registeredReader = registeredReader;
|
||||
this.metricDescriptor = metricDescriptor;
|
||||
this.aggregationTemporality =
|
||||
|
|
@ -66,6 +68,7 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
|
|||
.getAggregationTemporality(metricDescriptor.getSourceInstrument().getType());
|
||||
this.aggregator = aggregator;
|
||||
this.attributesProcessor = attributesProcessor;
|
||||
this.maxCardinality = maxCardinality;
|
||||
}
|
||||
|
||||
// Visible for testing
|
||||
|
|
@ -97,13 +100,13 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
|
|||
if (handle != null) {
|
||||
return handle;
|
||||
}
|
||||
if (aggregatorHandles.size() >= MAX_CARDINALITY) {
|
||||
if (aggregatorHandles.size() >= maxCardinality) {
|
||||
logger.log(
|
||||
Level.WARNING,
|
||||
"Instrument "
|
||||
+ metricDescriptor.getSourceInstrument().getName()
|
||||
+ " has exceeded the maximum allowed cardinality ("
|
||||
+ MAX_CARDINALITY
|
||||
+ maxCardinality
|
||||
+ ").");
|
||||
return null;
|
||||
}
|
||||
|
|
@ -143,9 +146,9 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
|
|||
}
|
||||
});
|
||||
|
||||
// Trim pool down if needed. pool.size() will only exceed MAX_CARDINALITY if new handles are
|
||||
// Trim pool down if needed. pool.size() will only exceed maxCardinality if new handles are
|
||||
// created during collection.
|
||||
int toDelete = aggregatorHandlePool.size() - MAX_CARDINALITY;
|
||||
int toDelete = aggregatorHandlePool.size() - maxCardinality;
|
||||
for (int i = 0; i < toDelete; i++) {
|
||||
aggregatorHandlePool.poll();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,8 +19,8 @@ import io.opentelemetry.sdk.resources.Resource;
|
|||
*/
|
||||
public interface MetricStorage {
|
||||
|
||||
/** The max number of distinct metric points for a particular {@link MetricStorage}. */
|
||||
int MAX_CARDINALITY = 2000;
|
||||
/** The default max number of distinct metric points for a particular {@link MetricStorage}. */
|
||||
int DEFAULT_MAX_CARDINALITY = 2000;
|
||||
|
||||
/** Returns a description of the metric produced in this storage. */
|
||||
MetricDescriptor getMetricDescriptor();
|
||||
|
|
|
|||
|
|
@ -55,6 +55,7 @@ public interface SynchronousMetricStorage extends MetricStorage, WriteableMetric
|
|||
registeredReader,
|
||||
metricDescriptor,
|
||||
aggregator,
|
||||
registeredView.getViewAttributesProcessor());
|
||||
registeredView.getViewAttributesProcessor(),
|
||||
registeredView.getCardinalityLimit());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,8 +25,10 @@ public abstract class RegisteredView {
|
|||
InstrumentSelector selector,
|
||||
View view,
|
||||
AttributesProcessor viewAttributesProcessor,
|
||||
int cardinalityLimit,
|
||||
SourceInfo viewSourceInfo) {
|
||||
return new AutoValue_RegisteredView(selector, view, viewAttributesProcessor, viewSourceInfo);
|
||||
return new AutoValue_RegisteredView(
|
||||
selector, view, viewAttributesProcessor, cardinalityLimit, viewSourceInfo);
|
||||
}
|
||||
|
||||
RegisteredView() {}
|
||||
|
|
@ -40,6 +42,9 @@ public abstract class RegisteredView {
|
|||
/** The view's {@link AttributesProcessor}. */
|
||||
public abstract AttributesProcessor getViewAttributesProcessor();
|
||||
|
||||
/** The view's cardinality limit. */
|
||||
public abstract int getCardinalityLimit();
|
||||
|
||||
/** The {@link SourceInfo} from where the view was registered. */
|
||||
public abstract SourceInfo getViewSourceInfo();
|
||||
|
||||
|
|
|
|||
|
|
@ -18,6 +18,8 @@ import io.opentelemetry.sdk.metrics.internal.aggregator.AggregationUtil;
|
|||
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
|
||||
import io.opentelemetry.sdk.metrics.internal.debug.SourceInfo;
|
||||
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
|
||||
import io.opentelemetry.sdk.metrics.internal.export.CardinalityLimitSelector;
|
||||
import io.opentelemetry.sdk.metrics.internal.state.MetricStorage;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
|
@ -45,6 +47,7 @@ public final class ViewRegistry {
|
|||
InstrumentSelector.builder().setName("*").build(),
|
||||
DEFAULT_VIEW,
|
||||
NOOP,
|
||||
MetricStorage.DEFAULT_MAX_CARDINALITY,
|
||||
SourceInfo.noSourceInfo());
|
||||
private static final Logger logger = Logger.getLogger(ViewRegistry.class.getName());
|
||||
|
||||
|
|
@ -52,7 +55,9 @@ public final class ViewRegistry {
|
|||
private final List<RegisteredView> registeredViews;
|
||||
|
||||
ViewRegistry(
|
||||
DefaultAggregationSelector defaultAggregationSelector, List<RegisteredView> registeredViews) {
|
||||
DefaultAggregationSelector defaultAggregationSelector,
|
||||
CardinalityLimitSelector cardinalityLimitSelector,
|
||||
List<RegisteredView> registeredViews) {
|
||||
instrumentDefaultRegisteredView = new HashMap<>();
|
||||
for (InstrumentType instrumentType : InstrumentType.values()) {
|
||||
instrumentDefaultRegisteredView.put(
|
||||
|
|
@ -63,6 +68,7 @@ public final class ViewRegistry {
|
|||
.setAggregation(defaultAggregationSelector.getDefaultAggregation(instrumentType))
|
||||
.build(),
|
||||
AttributesProcessor.noop(),
|
||||
cardinalityLimitSelector.getCardinalityLimit(instrumentType),
|
||||
SourceInfo.noSourceInfo()));
|
||||
}
|
||||
this.registeredViews = registeredViews;
|
||||
|
|
@ -70,13 +76,19 @@ public final class ViewRegistry {
|
|||
|
||||
/** Returns a {@link ViewRegistry}. */
|
||||
public static ViewRegistry create(
|
||||
DefaultAggregationSelector defaultAggregationSelector, List<RegisteredView> registeredViews) {
|
||||
return new ViewRegistry(defaultAggregationSelector, new ArrayList<>(registeredViews));
|
||||
DefaultAggregationSelector defaultAggregationSelector,
|
||||
CardinalityLimitSelector cardinalityLimitSelector,
|
||||
List<RegisteredView> registeredViews) {
|
||||
return new ViewRegistry(
|
||||
defaultAggregationSelector, cardinalityLimitSelector, new ArrayList<>(registeredViews));
|
||||
}
|
||||
|
||||
/** Return a {@link ViewRegistry} using the default aggregation and no views registered. */
|
||||
public static ViewRegistry create() {
|
||||
return create(unused -> Aggregation.defaultAggregation(), Collections.emptyList());
|
||||
return create(
|
||||
unused -> Aggregation.defaultAggregation(),
|
||||
CardinalityLimitSelector.defaultCardinalityLimitSelector(),
|
||||
Collections.emptyList());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -113,7 +125,7 @@ public final class ViewRegistry {
|
|||
return Collections.unmodifiableList(result);
|
||||
}
|
||||
|
||||
// Not views matched, use default view
|
||||
// No views matched, use default view
|
||||
RegisteredView instrumentDefaultView =
|
||||
Objects.requireNonNull(instrumentDefaultRegisteredView.get(descriptor.getType()));
|
||||
AggregatorFactory viewAggregatorFactory =
|
||||
|
|
|
|||
|
|
@ -5,16 +5,24 @@
|
|||
|
||||
package io.opentelemetry.sdk.metrics;
|
||||
|
||||
import static io.opentelemetry.sdk.metrics.internal.state.MetricStorage.DEFAULT_MAX_CARDINALITY;
|
||||
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
|
||||
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import io.opentelemetry.api.common.Attributes;
|
||||
import io.opentelemetry.api.metrics.LongCounter;
|
||||
import io.opentelemetry.api.metrics.LongHistogram;
|
||||
import io.opentelemetry.api.metrics.Meter;
|
||||
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
|
||||
import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
|
||||
import io.opentelemetry.sdk.metrics.data.Data;
|
||||
import io.opentelemetry.sdk.metrics.data.LongPointData;
|
||||
import io.opentelemetry.sdk.metrics.data.SumData;
|
||||
import io.opentelemetry.sdk.metrics.export.MetricReader;
|
||||
import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
|
||||
import io.opentelemetry.sdk.metrics.internal.export.CardinalityLimitSelector;
|
||||
import io.opentelemetry.sdk.metrics.internal.state.DefaultSynchronousMetricStorage;
|
||||
import io.opentelemetry.sdk.metrics.internal.state.MetricStorage;
|
||||
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Consumer;
|
||||
|
|
@ -26,9 +34,6 @@ import org.junit.jupiter.api.Test;
|
|||
@SuppressLogger(DefaultSynchronousMetricStorage.class)
|
||||
class CardinalityTest {
|
||||
|
||||
/** Traces {@code MetricStorageUtils#MAX_CARDINALITY}. */
|
||||
private static final int MAX_CARDINALITY = 2000;
|
||||
|
||||
private InMemoryMetricReader deltaReader;
|
||||
private InMemoryMetricReader cumulativeReader;
|
||||
private Meter meter;
|
||||
|
|
@ -50,14 +55,14 @@ class CardinalityTest {
|
|||
* are dropped for delta and cumulative readers. Stale metrics are those with attributes that did
|
||||
* not receive recordings in the most recent collection.
|
||||
*
|
||||
* <p>Effectively, we make sure we cap-out at attribute size = 2000 (constant in
|
||||
* MetricStorageutils).
|
||||
* <p>Effectively, we make sure we cap-out at attribute size = {@link
|
||||
* MetricStorage#DEFAULT_MAX_CARDINALITY}.
|
||||
*/
|
||||
@Test
|
||||
void staleMetricsDropped_synchronousInstrument() {
|
||||
LongCounter syncCounter = meter.counterBuilder("sync-counter").build();
|
||||
// Note: This constant comes from MetricStorageUtils, but it's package-private.
|
||||
for (int i = 1; i <= 2000; i++) {
|
||||
for (int i = 1; i <= DEFAULT_MAX_CARDINALITY; i++) {
|
||||
syncCounter.add(1, Attributes.builder().put("key", "num_" + i).build());
|
||||
|
||||
// DELTA reader only has latest
|
||||
|
|
@ -87,7 +92,7 @@ class CardinalityTest {
|
|||
.isEqualTo(currentSize))));
|
||||
}
|
||||
// Now punch the limit and ONLY metrics we just recorded stay, due to simplistic GC.
|
||||
for (int i = 2001; i <= 2010; i++) {
|
||||
for (int i = DEFAULT_MAX_CARDINALITY + 1; i <= DEFAULT_MAX_CARDINALITY + 10; i++) {
|
||||
syncCounter.add(1, Attributes.builder().put("key", "num_" + i).build());
|
||||
}
|
||||
assertThat(deltaReader.collectAllMetrics())
|
||||
|
|
@ -118,7 +123,7 @@ class CardinalityTest {
|
|||
(Consumer<SumData<LongPointData>>)
|
||||
sumPointData ->
|
||||
assertThat(sumPointData.getPoints().size())
|
||||
.isEqualTo(2000))));
|
||||
.isEqualTo(DEFAULT_MAX_CARDINALITY))));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -165,7 +170,7 @@ class CardinalityTest {
|
|||
void cardinalityLimits_synchronousInstrument() {
|
||||
LongCounter syncCounter1 = meter.counterBuilder("sync-counter1").build();
|
||||
LongCounter syncCounter2 = meter.counterBuilder("sync-counter2").build();
|
||||
for (int i = 0; i < MAX_CARDINALITY + 1; i++) {
|
||||
for (int i = 0; i < DEFAULT_MAX_CARDINALITY + 1; i++) {
|
||||
syncCounter1.add(1, Attributes.builder().put("key", "value" + i).build());
|
||||
syncCounter2.add(1, Attributes.builder().put("key", "value" + i).build());
|
||||
}
|
||||
|
|
@ -183,7 +188,7 @@ class CardinalityTest {
|
|||
(Consumer<SumData<LongPointData>>)
|
||||
sumPointData ->
|
||||
assertThat(sumPointData.getPoints().size())
|
||||
.isEqualTo(MAX_CARDINALITY))),
|
||||
.isEqualTo(DEFAULT_MAX_CARDINALITY))),
|
||||
metricData ->
|
||||
assertThat(metricData)
|
||||
.hasName("sync-counter2")
|
||||
|
|
@ -194,7 +199,7 @@ class CardinalityTest {
|
|||
(Consumer<SumData<LongPointData>>)
|
||||
sumPointData ->
|
||||
assertThat(sumPointData.getPoints().size())
|
||||
.isEqualTo(MAX_CARDINALITY))));
|
||||
.isEqualTo(DEFAULT_MAX_CARDINALITY))));
|
||||
|
||||
assertThat(cumulativeReader.collectAllMetrics())
|
||||
.as("Cumulative collection")
|
||||
|
|
@ -209,7 +214,7 @@ class CardinalityTest {
|
|||
(Consumer<SumData<LongPointData>>)
|
||||
sumPointData ->
|
||||
assertThat(sumPointData.getPoints().size())
|
||||
.isEqualTo(MAX_CARDINALITY))),
|
||||
.isEqualTo(DEFAULT_MAX_CARDINALITY))),
|
||||
metricData ->
|
||||
assertThat(metricData)
|
||||
.hasName("sync-counter2")
|
||||
|
|
@ -220,7 +225,7 @@ class CardinalityTest {
|
|||
(Consumer<SumData<LongPointData>>)
|
||||
sumPointData ->
|
||||
assertThat(sumPointData.getPoints().size())
|
||||
.isEqualTo(MAX_CARDINALITY))));
|
||||
.isEqualTo(DEFAULT_MAX_CARDINALITY))));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -231,7 +236,7 @@ class CardinalityTest {
|
|||
void cardinalityLimits_asynchronousInstrument() {
|
||||
Consumer<ObservableLongMeasurement> callback =
|
||||
measurement -> {
|
||||
for (int i = 0; i < MAX_CARDINALITY + 1; i++) {
|
||||
for (int i = 0; i < DEFAULT_MAX_CARDINALITY + 1; i++) {
|
||||
measurement.record(1, Attributes.builder().put("key", "value" + i).build());
|
||||
}
|
||||
};
|
||||
|
|
@ -251,7 +256,7 @@ class CardinalityTest {
|
|||
(Consumer<SumData<LongPointData>>)
|
||||
sumPointData ->
|
||||
assertThat(sumPointData.getPoints().size())
|
||||
.isEqualTo(MAX_CARDINALITY))),
|
||||
.isEqualTo(DEFAULT_MAX_CARDINALITY))),
|
||||
metricData ->
|
||||
assertThat(metricData)
|
||||
.hasName("async-counter2")
|
||||
|
|
@ -262,7 +267,7 @@ class CardinalityTest {
|
|||
(Consumer<SumData<LongPointData>>)
|
||||
sumPointData ->
|
||||
assertThat(sumPointData.getPoints().size())
|
||||
.isEqualTo(MAX_CARDINALITY))));
|
||||
.isEqualTo(DEFAULT_MAX_CARDINALITY))));
|
||||
|
||||
assertThat(cumulativeReader.collectAllMetrics())
|
||||
.as("Cumulative collection")
|
||||
|
|
@ -277,7 +282,7 @@ class CardinalityTest {
|
|||
(Consumer<SumData<LongPointData>>)
|
||||
sumPointData ->
|
||||
assertThat(sumPointData.getPoints().size())
|
||||
.isEqualTo(MAX_CARDINALITY))),
|
||||
.isEqualTo(DEFAULT_MAX_CARDINALITY))),
|
||||
metricData ->
|
||||
assertThat(metricData)
|
||||
.hasName("async-counter2")
|
||||
|
|
@ -288,6 +293,213 @@ class CardinalityTest {
|
|||
(Consumer<SumData<LongPointData>>)
|
||||
sumPointData ->
|
||||
assertThat(sumPointData.getPoints().size())
|
||||
.isEqualTo(MAX_CARDINALITY))));
|
||||
.isEqualTo(DEFAULT_MAX_CARDINALITY))));
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate ability to customize metric reader cardinality limits via {@link
|
||||
* SdkMeterProviderBuilder#registerMetricReader(MetricReader, CardinalityLimitSelector)}, and view
|
||||
* cardinality limits via {@link ViewBuilder#setCardinalityLimit(int)}.
|
||||
*/
|
||||
@Test
|
||||
void readerAndViewCardinalityConfiguration() {
|
||||
int counterLimit = 10;
|
||||
int generalLimit = 20;
|
||||
int counter2Limit = 30;
|
||||
|
||||
// Define a cardinality selector which has one limit for counters, and another general limit for
|
||||
// other instrument kinds
|
||||
CardinalityLimitSelector cardinalityLimitSelector =
|
||||
instrumentType -> instrumentType == InstrumentType.COUNTER ? counterLimit : generalLimit;
|
||||
SdkMeterProviderBuilder builder = SdkMeterProvider.builder();
|
||||
|
||||
// Register both the delta and cumulative reader with the customized cardinality selector
|
||||
SdkMeterProviderUtil.registerMetricReaderWithCardinalitySelector(
|
||||
builder, deltaReader, cardinalityLimitSelector);
|
||||
SdkMeterProviderUtil.registerMetricReaderWithCardinalitySelector(
|
||||
builder, cumulativeReader, cardinalityLimitSelector);
|
||||
|
||||
// Register a view which defines a custom cardinality limit for instrumented named "counter2"
|
||||
ViewBuilder viewBuilder = View.builder();
|
||||
SdkMeterProviderUtil.setCardinalityLimit(viewBuilder, counter2Limit);
|
||||
builder.registerView(
|
||||
InstrumentSelector.builder().setName("counter2").build(), viewBuilder.build());
|
||||
|
||||
SdkMeterProvider sdkMeterProvider = builder.build();
|
||||
meter = sdkMeterProvider.get(CardinalityTest.class.getName());
|
||||
|
||||
LongCounter counter1 = meter.counterBuilder("counter1").build();
|
||||
LongCounter counter2 = meter.counterBuilder("counter2").build();
|
||||
LongHistogram histogram = meter.histogramBuilder("histogram").ofLongs().build();
|
||||
|
||||
// Record enough measurements to exceed cardinality threshold
|
||||
for (int i = 0; i < DEFAULT_MAX_CARDINALITY; i++) {
|
||||
counter1.add(1, Attributes.builder().put("key", i).build());
|
||||
counter2.add(1, Attributes.builder().put("key", i).build());
|
||||
histogram.record(1, Attributes.builder().put("key", i).build());
|
||||
}
|
||||
|
||||
// Assert that each instrument has the appropriate number of points based on cardinality limits:
|
||||
// - counter1 should have counterLimit points
|
||||
// - counter2 should have counter2Limit points
|
||||
// - histogram should have generalLimit points
|
||||
assertThat(deltaReader.collectAllMetrics())
|
||||
.as("delta collection")
|
||||
.satisfiesExactlyInAnyOrder(
|
||||
metricData ->
|
||||
assertThat(metricData)
|
||||
.hasName("counter1")
|
||||
.hasLongSumSatisfying(
|
||||
sum ->
|
||||
sum.isDelta()
|
||||
.satisfies(
|
||||
data -> pointsAssert(data, counterLimit, 0, counterLimit))),
|
||||
metricData ->
|
||||
assertThat(metricData)
|
||||
.hasName("counter2")
|
||||
.hasLongSumSatisfying(
|
||||
sum ->
|
||||
sum.isDelta()
|
||||
.satisfies(
|
||||
data -> pointsAssert(data, counter2Limit, 0, counter2Limit))),
|
||||
metricData ->
|
||||
assertThat(metricData)
|
||||
.hasName("histogram")
|
||||
.hasHistogramSatisfying(
|
||||
histogramMetric ->
|
||||
histogramMetric
|
||||
.isDelta()
|
||||
.satisfies(
|
||||
data -> pointsAssert(data, generalLimit, 0, generalLimit))));
|
||||
assertThat(cumulativeReader.collectAllMetrics())
|
||||
.as("cumulative collection")
|
||||
.satisfiesExactlyInAnyOrder(
|
||||
metricData ->
|
||||
assertThat(metricData)
|
||||
.hasName("counter1")
|
||||
.hasLongSumSatisfying(
|
||||
sum ->
|
||||
sum.isCumulative()
|
||||
.satisfies(
|
||||
data -> pointsAssert(data, counterLimit, 0, counterLimit))),
|
||||
metricData ->
|
||||
assertThat(metricData)
|
||||
.hasName("counter2")
|
||||
.hasLongSumSatisfying(
|
||||
sum ->
|
||||
sum.isCumulative()
|
||||
.satisfies(
|
||||
data -> pointsAssert(data, counter2Limit, 0, counter2Limit))),
|
||||
metricData ->
|
||||
assertThat(metricData)
|
||||
.hasName("histogram")
|
||||
.hasHistogramSatisfying(
|
||||
histogramMetric ->
|
||||
histogramMetric
|
||||
.isCumulative()
|
||||
.satisfies(
|
||||
data -> pointsAssert(data, generalLimit, 0, generalLimit))));
|
||||
|
||||
// Record another round of measurements, again exceeding cardinality limits
|
||||
for (int i = DEFAULT_MAX_CARDINALITY; i < DEFAULT_MAX_CARDINALITY * 2; i++) {
|
||||
counter1.add(1, Attributes.builder().put("key", i).build());
|
||||
counter2.add(1, Attributes.builder().put("key", i).build());
|
||||
histogram.record(1, Attributes.builder().put("key", i).build());
|
||||
}
|
||||
|
||||
// Delta reader should have new points, forgetting the points with measurements recorded prior
|
||||
// to last collection
|
||||
assertThat(deltaReader.collectAllMetrics())
|
||||
.as("delta collection")
|
||||
.satisfiesExactlyInAnyOrder(
|
||||
metricData ->
|
||||
assertThat(metricData)
|
||||
.hasName("counter1")
|
||||
.hasLongSumSatisfying(
|
||||
sum ->
|
||||
sum.isDelta()
|
||||
.satisfies(
|
||||
data ->
|
||||
pointsAssert(
|
||||
data,
|
||||
counterLimit,
|
||||
DEFAULT_MAX_CARDINALITY,
|
||||
DEFAULT_MAX_CARDINALITY + counterLimit))),
|
||||
metricData ->
|
||||
assertThat(metricData)
|
||||
.hasName("counter2")
|
||||
.hasLongSumSatisfying(
|
||||
sum ->
|
||||
sum.isDelta()
|
||||
.satisfies(
|
||||
data ->
|
||||
pointsAssert(
|
||||
data,
|
||||
counter2Limit,
|
||||
DEFAULT_MAX_CARDINALITY,
|
||||
DEFAULT_MAX_CARDINALITY + counter2Limit))),
|
||||
metricData ->
|
||||
assertThat(metricData)
|
||||
.hasName("histogram")
|
||||
.hasHistogramSatisfying(
|
||||
histogramMetric ->
|
||||
histogramMetric
|
||||
.isDelta()
|
||||
.satisfies(
|
||||
data ->
|
||||
pointsAssert(
|
||||
data,
|
||||
generalLimit,
|
||||
DEFAULT_MAX_CARDINALITY,
|
||||
DEFAULT_MAX_CARDINALITY + generalLimit))));
|
||||
|
||||
// Cumulative reader should retain old points, dropping the new measurements
|
||||
assertThat(cumulativeReader.collectAllMetrics())
|
||||
.as("cumulative collection")
|
||||
.satisfiesExactlyInAnyOrder(
|
||||
metricData ->
|
||||
assertThat(metricData)
|
||||
.hasName("counter1")
|
||||
.hasLongSumSatisfying(
|
||||
sum ->
|
||||
sum.isCumulative()
|
||||
.satisfies(
|
||||
data -> pointsAssert(data, counterLimit, 0, counterLimit))),
|
||||
metricData ->
|
||||
assertThat(metricData)
|
||||
.hasName("counter2")
|
||||
.hasLongSumSatisfying(
|
||||
sum ->
|
||||
sum.isCumulative()
|
||||
.satisfies(
|
||||
data -> pointsAssert(data, counter2Limit, 0, counter2Limit))),
|
||||
metricData ->
|
||||
assertThat(metricData)
|
||||
.hasName("histogram")
|
||||
.hasHistogramSatisfying(
|
||||
histogramMetric ->
|
||||
histogramMetric
|
||||
.isCumulative()
|
||||
.satisfies(
|
||||
data -> pointsAssert(data, generalLimit, 0, generalLimit))));
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper function for {@link #readerAndViewCardinalityConfiguration()}. Asserts that the {@code
|
||||
* data} contains the {@code expectedNumPoints}, and has the attribute "key" values in the range
|
||||
* [{@code minAttributeValueInclusive}, {@code maxAttributeValueExclusive}).
|
||||
*/
|
||||
private static void pointsAssert(
|
||||
Data<?> data,
|
||||
int expectedNumPoints,
|
||||
int minAttributeValueInclusive,
|
||||
int maxAttributeValueExclusive) {
|
||||
assertThat(data.getPoints())
|
||||
.hasSize(expectedNumPoints)
|
||||
.allSatisfy(
|
||||
point ->
|
||||
assertThat(point.getAttributes().get(AttributeKey.longKey("key")))
|
||||
.isGreaterThanOrEqualTo(minAttributeValueInclusive)
|
||||
.isLessThan(maxAttributeValueExclusive));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,12 +15,17 @@ class ViewTest {
|
|||
void stringRepresentation() {
|
||||
assertThat(View.builder().build().toString())
|
||||
.isEqualTo(
|
||||
"View{aggregation=DefaultAggregation, attributesProcessor=NoopAttributesProcessor{}}");
|
||||
"View{"
|
||||
+ "aggregation=DefaultAggregation, "
|
||||
+ "attributesProcessor=NoopAttributesProcessor{}, "
|
||||
+ "cardinalityLimit=2000"
|
||||
+ "}");
|
||||
assertThat(
|
||||
View.builder()
|
||||
.setName("name")
|
||||
.setDescription("description")
|
||||
.setAggregation(Aggregation.sum())
|
||||
.setCardinalityLimit(10)
|
||||
.build()
|
||||
.toString())
|
||||
.isEqualTo(
|
||||
|
|
@ -28,7 +33,8 @@ class ViewTest {
|
|||
+ "name=name, "
|
||||
+ "description=description, "
|
||||
+ "aggregation=SumAggregation, "
|
||||
+ "attributesProcessor=NoopAttributesProcessor{}"
|
||||
+ "attributesProcessor=NoopAttributesProcessor{}, "
|
||||
+ "cardinalityLimit=10"
|
||||
+ "}");
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -42,6 +42,8 @@ import org.mockito.junit.jupiter.MockitoExtension;
|
|||
@ExtendWith(MockitoExtension.class)
|
||||
class AsynchronousMetricStorageTest {
|
||||
|
||||
private static final int CARDINALITY_LIMIT = 25;
|
||||
|
||||
@RegisterExtension
|
||||
LogCapturer logs = LogCapturer.create().captureForType(AsynchronousMetricStorage.class);
|
||||
|
||||
|
|
@ -51,7 +53,11 @@ class AsynchronousMetricStorageTest {
|
|||
private final InstrumentSelector selector = InstrumentSelector.builder().setName("*").build();
|
||||
private final RegisteredView registeredView =
|
||||
RegisteredView.create(
|
||||
selector, View.builder().build(), AttributesProcessor.noop(), SourceInfo.noSourceInfo());
|
||||
selector,
|
||||
View.builder().build(),
|
||||
AttributesProcessor.noop(),
|
||||
CARDINALITY_LIMIT,
|
||||
SourceInfo.noSourceInfo());
|
||||
|
||||
@Mock private MetricReader reader;
|
||||
private RegisteredReader registeredReader;
|
||||
|
|
@ -149,6 +155,7 @@ class AsynchronousMetricStorageTest {
|
|||
selector,
|
||||
View.builder().build(),
|
||||
AttributesProcessor.filterByKeyName(key -> key.equals("key1")),
|
||||
CARDINALITY_LIMIT,
|
||||
SourceInfo.noSourceInfo()),
|
||||
InstrumentDescriptor.create(
|
||||
"name",
|
||||
|
|
@ -175,7 +182,7 @@ class AsynchronousMetricStorageTest {
|
|||
|
||||
@Test
|
||||
void record_MaxCardinality() {
|
||||
for (int i = 0; i <= MetricStorage.MAX_CARDINALITY + 1; i++) {
|
||||
for (int i = 0; i <= CARDINALITY_LIMIT + 1; i++) {
|
||||
longCounterStorage.record(
|
||||
longMeasurement(0, 1, 1, Attributes.builder().put("key" + i, "val").build()));
|
||||
}
|
||||
|
|
@ -183,8 +190,7 @@ class AsynchronousMetricStorageTest {
|
|||
assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime()))
|
||||
.satisfies(
|
||||
metricData ->
|
||||
assertThat(metricData.getLongSumData().getPoints())
|
||||
.hasSize(MetricStorage.MAX_CARDINALITY));
|
||||
assertThat(metricData.getLongSumData().getPoints()).hasSize(CARDINALITY_LIMIT));
|
||||
logs.assertContains("Instrument long-counter has exceeded the maximum allowed cardinality");
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -53,6 +53,7 @@ public class SynchronousMetricStorageTest {
|
|||
Advice.empty());
|
||||
private static final MetricDescriptor METRIC_DESCRIPTOR =
|
||||
MetricDescriptor.create("name", "description", "unit");
|
||||
private static final int CARDINALITY_LIMIT = 25;
|
||||
|
||||
@RegisterExtension
|
||||
LogCapturer logs = LogCapturer.create().captureForType(DefaultSynchronousMetricStorage.class);
|
||||
|
|
@ -76,7 +77,11 @@ public class SynchronousMetricStorageTest {
|
|||
AttributesProcessor spyAttributesProcessor = spy(attributesProcessor);
|
||||
SynchronousMetricStorage storage =
|
||||
new DefaultSynchronousMetricStorage<>(
|
||||
cumulativeReader, METRIC_DESCRIPTOR, aggregator, spyAttributesProcessor);
|
||||
cumulativeReader,
|
||||
METRIC_DESCRIPTOR,
|
||||
aggregator,
|
||||
spyAttributesProcessor,
|
||||
CARDINALITY_LIMIT);
|
||||
storage.recordDouble(1, attributes, Context.root());
|
||||
MetricData md = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, testClock.now());
|
||||
assertThat(md)
|
||||
|
|
@ -92,7 +97,11 @@ public class SynchronousMetricStorageTest {
|
|||
void recordAndCollect_CumulativeDoesNotReset() {
|
||||
DefaultSynchronousMetricStorage<?, ?> storage =
|
||||
new DefaultSynchronousMetricStorage<>(
|
||||
cumulativeReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor);
|
||||
cumulativeReader,
|
||||
METRIC_DESCRIPTOR,
|
||||
aggregator,
|
||||
attributesProcessor,
|
||||
CARDINALITY_LIMIT);
|
||||
|
||||
// Record measurement and collect at time 10
|
||||
storage.recordDouble(3, Attributes.empty(), Context.current());
|
||||
|
|
@ -134,7 +143,7 @@ public class SynchronousMetricStorageTest {
|
|||
void recordAndCollect_DeltaResets() {
|
||||
DefaultSynchronousMetricStorage<?, ?> storage =
|
||||
new DefaultSynchronousMetricStorage<>(
|
||||
deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor);
|
||||
deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor, CARDINALITY_LIMIT);
|
||||
|
||||
// Record measurement and collect at time 10
|
||||
storage.recordDouble(3, Attributes.empty(), Context.current());
|
||||
|
|
@ -181,14 +190,18 @@ public class SynchronousMetricStorageTest {
|
|||
void recordAndCollect_CumulativeAtLimit() {
|
||||
DefaultSynchronousMetricStorage<?, ?> storage =
|
||||
new DefaultSynchronousMetricStorage<>(
|
||||
cumulativeReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor);
|
||||
cumulativeReader,
|
||||
METRIC_DESCRIPTOR,
|
||||
aggregator,
|
||||
attributesProcessor,
|
||||
CARDINALITY_LIMIT);
|
||||
|
||||
// Record measurements for max number of attributes
|
||||
for (int i = 0; i < MetricStorage.MAX_CARDINALITY; i++) {
|
||||
for (int i = 0; i < CARDINALITY_LIMIT; i++) {
|
||||
storage.recordDouble(
|
||||
3, Attributes.builder().put("key", "value" + i).build(), Context.current());
|
||||
}
|
||||
verify(aggregator, times(MetricStorage.MAX_CARDINALITY)).createHandle();
|
||||
verify(aggregator, times(CARDINALITY_LIMIT)).createHandle();
|
||||
assertThat(storage.getAggregatorHandlePool()).hasSize(0);
|
||||
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10))
|
||||
.hasDoubleSumSatisfying(
|
||||
|
|
@ -196,7 +209,7 @@ public class SynchronousMetricStorageTest {
|
|||
sum.satisfies(
|
||||
sumData ->
|
||||
assertThat(sumData.getPoints())
|
||||
.hasSize(MetricStorage.MAX_CARDINALITY)
|
||||
.hasSize(CARDINALITY_LIMIT)
|
||||
.allSatisfy(
|
||||
point -> {
|
||||
assertThat(point.getStartEpochNanos()).isEqualTo(0);
|
||||
|
|
@ -210,10 +223,10 @@ public class SynchronousMetricStorageTest {
|
|||
// Record measurement for additional attribute, exceeding limit
|
||||
storage.recordDouble(
|
||||
3,
|
||||
Attributes.builder().put("key", "value" + MetricStorage.MAX_CARDINALITY + 1).build(),
|
||||
Attributes.builder().put("key", "value" + CARDINALITY_LIMIT + 1).build(),
|
||||
Context.current());
|
||||
// Should not create additional handles after MAX_CARDINALITY is reached
|
||||
verify(aggregator, times(MetricStorage.MAX_CARDINALITY)).createHandle();
|
||||
// Should not create additional handles after CARDINALITY_LIMIT is reached
|
||||
verify(aggregator, times(CARDINALITY_LIMIT)).createHandle();
|
||||
assertThat(storage.getAggregatorHandlePool()).hasSize(0);
|
||||
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 20))
|
||||
.hasDoubleSumSatisfying(
|
||||
|
|
@ -221,7 +234,7 @@ public class SynchronousMetricStorageTest {
|
|||
sum.satisfies(
|
||||
sumData ->
|
||||
assertThat(sumData.getPoints())
|
||||
.hasSize(MetricStorage.MAX_CARDINALITY)
|
||||
.hasSize(CARDINALITY_LIMIT)
|
||||
.allSatisfy(
|
||||
point -> {
|
||||
assertThat(point.getStartEpochNanos()).isEqualTo(0);
|
||||
|
|
@ -233,7 +246,7 @@ public class SynchronousMetricStorageTest {
|
|||
point
|
||||
.getAttributes()
|
||||
.get(AttributeKey.stringKey("key"))
|
||||
.equals("value" + MetricStorage.MAX_CARDINALITY + 1))));
|
||||
.equals("value" + CARDINALITY_LIMIT + 1))));
|
||||
assertThat(storage.getAggregatorHandlePool()).hasSize(0);
|
||||
logs.assertContains("Instrument name has exceeded the maximum allowed cardinality");
|
||||
}
|
||||
|
|
@ -242,14 +255,14 @@ public class SynchronousMetricStorageTest {
|
|||
void recordAndCollect_DeltaAtLimit() {
|
||||
DefaultSynchronousMetricStorage<?, ?> storage =
|
||||
new DefaultSynchronousMetricStorage<>(
|
||||
deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor);
|
||||
deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor, CARDINALITY_LIMIT);
|
||||
|
||||
// Record measurements for max number of attributes
|
||||
for (int i = 0; i < MetricStorage.MAX_CARDINALITY; i++) {
|
||||
for (int i = 0; i < CARDINALITY_LIMIT; i++) {
|
||||
storage.recordDouble(
|
||||
3, Attributes.builder().put("key", "value" + i).build(), Context.current());
|
||||
}
|
||||
verify(aggregator, times(MetricStorage.MAX_CARDINALITY)).createHandle();
|
||||
verify(aggregator, times(CARDINALITY_LIMIT)).createHandle();
|
||||
assertThat(storage.getAggregatorHandlePool()).hasSize(0);
|
||||
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10))
|
||||
.hasDoubleSumSatisfying(
|
||||
|
|
@ -257,25 +270,25 @@ public class SynchronousMetricStorageTest {
|
|||
sum.satisfies(
|
||||
sumData ->
|
||||
assertThat(sumData.getPoints())
|
||||
.hasSize(MetricStorage.MAX_CARDINALITY)
|
||||
.hasSize(CARDINALITY_LIMIT)
|
||||
.allSatisfy(
|
||||
point -> {
|
||||
assertThat(point.getStartEpochNanos()).isEqualTo(0);
|
||||
assertThat(point.getEpochNanos()).isEqualTo(10);
|
||||
assertThat(point.getValue()).isEqualTo(3);
|
||||
})));
|
||||
assertThat(storage.getAggregatorHandlePool()).hasSize(MetricStorage.MAX_CARDINALITY);
|
||||
assertThat(storage.getAggregatorHandlePool()).hasSize(CARDINALITY_LIMIT);
|
||||
assertThat(logs.getEvents()).isEmpty();
|
||||
deltaReader.setLastCollectEpochNanos(10);
|
||||
|
||||
// Record measurement for additional attribute, should not exceed limit due to reset
|
||||
storage.recordDouble(
|
||||
3,
|
||||
Attributes.builder().put("key", "value" + MetricStorage.MAX_CARDINALITY + 1).build(),
|
||||
Attributes.builder().put("key", "value" + CARDINALITY_LIMIT + 1).build(),
|
||||
Context.current());
|
||||
// Should use handle returned to pool instead of creating new ones
|
||||
verify(aggregator, times(MetricStorage.MAX_CARDINALITY)).createHandle();
|
||||
assertThat(storage.getAggregatorHandlePool()).hasSize(MetricStorage.MAX_CARDINALITY - 1);
|
||||
verify(aggregator, times(CARDINALITY_LIMIT)).createHandle();
|
||||
assertThat(storage.getAggregatorHandlePool()).hasSize(CARDINALITY_LIMIT - 1);
|
||||
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 20))
|
||||
.hasDoubleSumSatisfying(
|
||||
sum ->
|
||||
|
|
@ -288,19 +301,19 @@ public class SynchronousMetricStorageTest {
|
|||
.hasValue(3)
|
||||
.hasAttributes(
|
||||
Attributes.builder()
|
||||
.put("key", "value" + MetricStorage.MAX_CARDINALITY + 1)
|
||||
.put("key", "value" + CARDINALITY_LIMIT + 1)
|
||||
.build())));
|
||||
assertThat(storage.getAggregatorHandlePool()).hasSize(MetricStorage.MAX_CARDINALITY);
|
||||
assertThat(storage.getAggregatorHandlePool()).hasSize(CARDINALITY_LIMIT);
|
||||
assertThat(logs.getEvents()).isEmpty();
|
||||
deltaReader.setLastCollectEpochNanos(20);
|
||||
|
||||
// Record measurements exceeding max number of attributes. Last measurement should be dropped
|
||||
for (int i = 0; i < MetricStorage.MAX_CARDINALITY + 1; i++) {
|
||||
for (int i = 0; i < CARDINALITY_LIMIT + 1; i++) {
|
||||
storage.recordDouble(
|
||||
3, Attributes.builder().put("key", "value" + i).build(), Context.current());
|
||||
}
|
||||
// Should use handles returned to pool instead of creating new ones
|
||||
verify(aggregator, times(MetricStorage.MAX_CARDINALITY)).createHandle();
|
||||
verify(aggregator, times(CARDINALITY_LIMIT)).createHandle();
|
||||
assertThat(storage.getAggregatorHandlePool()).hasSize(0);
|
||||
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 30))
|
||||
.hasDoubleSumSatisfying(
|
||||
|
|
@ -308,7 +321,7 @@ public class SynchronousMetricStorageTest {
|
|||
sum.satisfies(
|
||||
sumData ->
|
||||
assertThat(sumData.getPoints())
|
||||
.hasSize(MetricStorage.MAX_CARDINALITY)
|
||||
.hasSize(CARDINALITY_LIMIT)
|
||||
.allSatisfy(
|
||||
point -> {
|
||||
assertThat(point.getStartEpochNanos()).isEqualTo(20);
|
||||
|
|
@ -320,8 +333,8 @@ public class SynchronousMetricStorageTest {
|
|||
point
|
||||
.getAttributes()
|
||||
.get(AttributeKey.stringKey("key"))
|
||||
.equals("value" + MetricStorage.MAX_CARDINALITY + 1))));
|
||||
assertThat(storage.getAggregatorHandlePool()).hasSize(MetricStorage.MAX_CARDINALITY);
|
||||
.equals("value" + CARDINALITY_LIMIT + 1))));
|
||||
assertThat(storage.getAggregatorHandlePool()).hasSize(CARDINALITY_LIMIT);
|
||||
logs.assertContains("Instrument name has exceeded the maximum allowed cardinality");
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import io.opentelemetry.sdk.metrics.InstrumentSelector;
|
|||
import io.opentelemetry.sdk.metrics.InstrumentType;
|
||||
import io.opentelemetry.sdk.metrics.View;
|
||||
import io.opentelemetry.sdk.metrics.internal.debug.SourceInfo;
|
||||
import io.opentelemetry.sdk.metrics.internal.state.MetricStorage;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class RegisteredViewTest {
|
||||
|
|
@ -33,12 +34,13 @@ class RegisteredViewTest {
|
|||
.setAggregation(Aggregation.sum())
|
||||
.build(),
|
||||
AttributesProcessor.noop(),
|
||||
MetricStorage.DEFAULT_MAX_CARDINALITY,
|
||||
SourceInfo.fromCurrentStack())
|
||||
.toString())
|
||||
.isEqualTo(
|
||||
"RegisteredView{"
|
||||
+ "instrumentSelector=InstrumentSelector{instrumentType=COUNTER, instrumentName=name, meterName=meter-name, meterVersion=meter-version, meterSchemaUrl=meter-schema-url}, "
|
||||
+ "view=View{name=name, description=description, aggregation=SumAggregation, attributesProcessor=NoopAttributesProcessor{}}"
|
||||
+ "view=View{name=name, description=description, aggregation=SumAggregation, attributesProcessor=NoopAttributesProcessor{}, cardinalityLimit=2000}"
|
||||
+ "}");
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,6 +21,8 @@ import io.opentelemetry.sdk.metrics.export.DefaultAggregationSelector;
|
|||
import io.opentelemetry.sdk.metrics.internal.debug.SourceInfo;
|
||||
import io.opentelemetry.sdk.metrics.internal.descriptor.Advice;
|
||||
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
|
||||
import io.opentelemetry.sdk.metrics.internal.export.CardinalityLimitSelector;
|
||||
import io.opentelemetry.sdk.metrics.internal.state.MetricStorage;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
|
@ -38,7 +40,11 @@ class ViewRegistryTest {
|
|||
|
||||
private static RegisteredView registeredView(InstrumentSelector instrumentSelector, View view) {
|
||||
return RegisteredView.create(
|
||||
instrumentSelector, view, AttributesProcessor.noop(), SourceInfo.fromCurrentStack());
|
||||
instrumentSelector,
|
||||
view,
|
||||
AttributesProcessor.noop(),
|
||||
MetricStorage.DEFAULT_MAX_CARDINALITY,
|
||||
SourceInfo.fromCurrentStack());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -49,7 +55,9 @@ class ViewRegistryTest {
|
|||
View.builder().setDescription("description").build());
|
||||
ViewRegistry viewRegistry =
|
||||
ViewRegistry.create(
|
||||
DefaultAggregationSelector.getDefault(), Collections.singletonList(registeredView));
|
||||
DefaultAggregationSelector.getDefault(),
|
||||
CardinalityLimitSelector.defaultCardinalityLimitSelector(),
|
||||
Collections.singletonList(registeredView));
|
||||
|
||||
assertThat(
|
||||
viewRegistry.findViews(
|
||||
|
|
@ -81,7 +89,9 @@ class ViewRegistryTest {
|
|||
View.builder().setDescription("description").build());
|
||||
ViewRegistry viewRegistry =
|
||||
ViewRegistry.create(
|
||||
DefaultAggregationSelector.getDefault(), Collections.singletonList(registeredView));
|
||||
DefaultAggregationSelector.getDefault(),
|
||||
CardinalityLimitSelector.defaultCardinalityLimitSelector(),
|
||||
Collections.singletonList(registeredView));
|
||||
|
||||
assertThat(
|
||||
viewRegistry.findViews(
|
||||
|
|
@ -113,7 +123,9 @@ class ViewRegistryTest {
|
|||
View.builder().setDescription("description").build());
|
||||
ViewRegistry viewRegistry =
|
||||
ViewRegistry.create(
|
||||
DefaultAggregationSelector.getDefault(), Collections.singletonList(registeredView));
|
||||
DefaultAggregationSelector.getDefault(),
|
||||
CardinalityLimitSelector.defaultCardinalityLimitSelector(),
|
||||
Collections.singletonList(registeredView));
|
||||
|
||||
assertThat(
|
||||
viewRegistry.findViews(
|
||||
|
|
@ -156,6 +168,7 @@ class ViewRegistryTest {
|
|||
ViewRegistry viewRegistry =
|
||||
ViewRegistry.create(
|
||||
DefaultAggregationSelector.getDefault(),
|
||||
CardinalityLimitSelector.defaultCardinalityLimitSelector(),
|
||||
Arrays.asList(registeredView1, registeredView2));
|
||||
|
||||
assertThat(
|
||||
|
|
@ -195,7 +208,9 @@ class ViewRegistryTest {
|
|||
View.builder().setAggregation(Aggregation.explicitBucketHistogram()).build());
|
||||
ViewRegistry viewRegistry =
|
||||
ViewRegistry.create(
|
||||
DefaultAggregationSelector.getDefault(), Collections.singletonList(registeredView));
|
||||
DefaultAggregationSelector.getDefault(),
|
||||
CardinalityLimitSelector.defaultCardinalityLimitSelector(),
|
||||
Collections.singletonList(registeredView));
|
||||
|
||||
assertThat(
|
||||
viewRegistry.findViews(
|
||||
|
|
@ -254,7 +269,10 @@ class ViewRegistryTest {
|
|||
: Aggregation.defaultAggregation();
|
||||
|
||||
ViewRegistry viewRegistry =
|
||||
ViewRegistry.create(defaultAggregationSelector, Collections.singletonList(registeredView));
|
||||
ViewRegistry.create(
|
||||
defaultAggregationSelector,
|
||||
CardinalityLimitSelector.defaultCardinalityLimitSelector(),
|
||||
Collections.singletonList(registeredView));
|
||||
|
||||
// Counter instrument should result in default view
|
||||
assertThat(
|
||||
|
|
@ -330,7 +348,9 @@ class ViewRegistryTest {
|
|||
View.builder().setAggregation(Aggregation.explicitBucketHistogram()).build());
|
||||
ViewRegistry viewRegistry =
|
||||
ViewRegistry.create(
|
||||
DefaultAggregationSelector.getDefault(), Collections.singletonList(registeredView));
|
||||
DefaultAggregationSelector.getDefault(),
|
||||
CardinalityLimitSelector.defaultCardinalityLimitSelector(),
|
||||
Collections.singletonList(registeredView));
|
||||
|
||||
assertThat(
|
||||
viewRegistry.findViews(
|
||||
|
|
|
|||
Loading…
Reference in New Issue