Merge 7ab9b3a567
into b79242d858
This commit is contained in:
commit
a23ac38b06
|
@ -88,7 +88,8 @@ final class SdkMeter implements Meter {
|
|||
private final MeterProviderSharedState meterProviderSharedState;
|
||||
private final InstrumentationScopeInfo instrumentationScopeInfo;
|
||||
private final Map<RegisteredReader, MetricStorageRegistry> readerStorageRegistries;
|
||||
private final boolean meterEnabled;
|
||||
|
||||
private boolean meterEnabled;
|
||||
|
||||
SdkMeter(
|
||||
MeterProviderSharedState meterProviderSharedState,
|
||||
|
@ -103,6 +104,18 @@ final class SdkMeter implements Meter {
|
|||
this.meterEnabled = meterConfig.isEnabled();
|
||||
}
|
||||
|
||||
void updateMeterConfig(MeterConfig meterConfig) {
|
||||
meterEnabled = meterConfig.isEnabled();
|
||||
|
||||
for (RegisteredReader registeredReader : readerStorageRegistries.keySet()) {
|
||||
Collection<MetricStorage> storages =
|
||||
Objects.requireNonNull(readerStorageRegistries.get(registeredReader)).getStorages();
|
||||
for (MetricStorage storage : storages) {
|
||||
storage.setEnabled(meterEnabled);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Visible for testing
|
||||
InstrumentationScopeInfo getInstrumentationScopeInfo() {
|
||||
return instrumentationScopeInfo;
|
||||
|
|
|
@ -52,9 +52,10 @@ public final class SdkMeterProvider implements MeterProvider, Closeable {
|
|||
private final List<MetricProducer> metricProducers;
|
||||
private final MeterProviderSharedState sharedState;
|
||||
private final ComponentRegistry<SdkMeter> registry;
|
||||
private final ScopeConfigurator<MeterConfig> meterConfigurator;
|
||||
private final AtomicBoolean isClosed = new AtomicBoolean(false);
|
||||
|
||||
private ScopeConfigurator<MeterConfig> meterConfigurator;
|
||||
|
||||
/** Returns a new {@link SdkMeterProviderBuilder} for {@link SdkMeterProvider}. */
|
||||
public static SdkMeterProviderBuilder builder() {
|
||||
return new SdkMeterProviderBuilder();
|
||||
|
@ -105,6 +106,15 @@ public final class SdkMeterProvider implements MeterProvider, Closeable {
|
|||
return meterConfig == null ? MeterConfig.defaultConfig() : meterConfig;
|
||||
}
|
||||
|
||||
void setMeterConfigurator(ScopeConfigurator<MeterConfig> meterConfigurator) {
|
||||
this.meterConfigurator = meterConfigurator;
|
||||
this.registry
|
||||
.getComponents()
|
||||
.forEach(
|
||||
sdkMeter ->
|
||||
sdkMeter.updateMeterConfig(getMeterConfig(sdkMeter.getInstrumentationScopeInfo())));
|
||||
}
|
||||
|
||||
@Override
|
||||
public MeterBuilder meterBuilder(String instrumentationScopeName) {
|
||||
if (registeredReaders.isEmpty()) {
|
||||
|
|
|
@ -49,6 +49,19 @@ public final class SdkMeterProviderUtil {
|
|||
return sdkMeterProviderBuilder;
|
||||
}
|
||||
|
||||
/** Reflectively set the {@link ScopeConfigurator} to the {@link SdkMeterProvider}. */
|
||||
public static void setMeterConfigurator(
|
||||
SdkMeterProvider sdkMeterProvider, ScopeConfigurator<MeterConfig> scopeConfigurator) {
|
||||
try {
|
||||
Method method =
|
||||
SdkMeterProvider.class.getDeclaredMethod("setMeterConfigurator", ScopeConfigurator.class);
|
||||
method.setAccessible(true);
|
||||
method.invoke(sdkMeterProvider, scopeConfigurator);
|
||||
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
|
||||
throw new IllegalStateException("Error calling setMeterConfigurator on SdkMeterProvider", e);
|
||||
}
|
||||
}
|
||||
|
||||
/** Reflectively set the {@link ScopeConfigurator} to the {@link SdkMeterProviderBuilder}. */
|
||||
public static SdkMeterProviderBuilder setMeterConfigurator(
|
||||
SdkMeterProviderBuilder sdkMeterProviderBuilder,
|
||||
|
|
|
@ -7,7 +7,6 @@ package io.opentelemetry.sdk.metrics.internal.aggregator;
|
|||
|
||||
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
|
||||
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
|
||||
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
|
||||
import io.opentelemetry.sdk.metrics.data.ExemplarData;
|
||||
import io.opentelemetry.sdk.metrics.data.MetricData;
|
||||
import io.opentelemetry.sdk.metrics.data.MetricDataType;
|
||||
|
@ -27,8 +26,9 @@ import javax.annotation.concurrent.Immutable;
|
|||
@Immutable
|
||||
public interface Aggregator<T extends PointData, U extends ExemplarData> {
|
||||
/** Returns the drop aggregator, an aggregator that drops measurements. */
|
||||
static Aggregator<?, DoubleExemplarData> drop() {
|
||||
return DropAggregator.INSTANCE;
|
||||
@SuppressWarnings("unchecked")
|
||||
static <T extends PointData, U extends ExemplarData> Aggregator<T, U> drop() {
|
||||
return (Aggregator<T, U>) DropAggregator.INSTANCE;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -8,7 +8,6 @@ package io.opentelemetry.sdk.metrics.internal.aggregator;
|
|||
import io.opentelemetry.api.common.Attributes;
|
||||
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
|
||||
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
|
||||
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
|
||||
import io.opentelemetry.sdk.metrics.data.ExemplarData;
|
||||
import io.opentelemetry.sdk.metrics.data.MetricData;
|
||||
import io.opentelemetry.sdk.metrics.data.PointData;
|
||||
|
@ -25,7 +24,7 @@ import java.util.List;
|
|||
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
|
||||
* at any time.
|
||||
*/
|
||||
public final class DropAggregator implements Aggregator<PointData, DoubleExemplarData> {
|
||||
public final class DropAggregator implements Aggregator<PointData, ExemplarData> {
|
||||
|
||||
private static final PointData POINT_DATA =
|
||||
new PointData() {
|
||||
|
@ -50,16 +49,16 @@ public final class DropAggregator implements Aggregator<PointData, DoubleExempla
|
|||
}
|
||||
};
|
||||
|
||||
public static final Aggregator<PointData, DoubleExemplarData> INSTANCE = new DropAggregator();
|
||||
public static final Aggregator<PointData, ExemplarData> INSTANCE = new DropAggregator();
|
||||
|
||||
private static final AggregatorHandle<PointData, DoubleExemplarData> HANDLE =
|
||||
new AggregatorHandle<PointData, DoubleExemplarData>(ExemplarReservoir.doubleNoSamples()) {
|
||||
private static final AggregatorHandle<PointData, ExemplarData> HANDLE =
|
||||
new AggregatorHandle<PointData, ExemplarData>(ExemplarReservoir.anyNoSamples()) {
|
||||
@Override
|
||||
protected PointData doAggregateThenMaybeReset(
|
||||
long startEpochNanos,
|
||||
long epochNanos,
|
||||
Attributes attributes,
|
||||
List<DoubleExemplarData> exemplars,
|
||||
List<ExemplarData> exemplars,
|
||||
boolean reset) {
|
||||
return POINT_DATA;
|
||||
}
|
||||
|
@ -74,7 +73,7 @@ public final class DropAggregator implements Aggregator<PointData, DoubleExempla
|
|||
private DropAggregator() {}
|
||||
|
||||
@Override
|
||||
public AggregatorHandle<PointData, DoubleExemplarData> createHandle() {
|
||||
public AggregatorHandle<PointData, ExemplarData> createHandle() {
|
||||
return HANDLE;
|
||||
}
|
||||
|
||||
|
|
|
@ -55,6 +55,11 @@ public interface ExemplarReservoir<T extends ExemplarData> {
|
|||
return NoopExemplarReservoir.LONG_INSTANCE;
|
||||
}
|
||||
|
||||
/** An exemplar reservoir that stores no exemplars. */
|
||||
static ExemplarReservoir<ExemplarData> anyNoSamples() {
|
||||
return NoopExemplarReservoir.ANY_INSTANCE;
|
||||
}
|
||||
|
||||
/**
|
||||
* A double reservoir with fixed size that stores the given number of exemplars.
|
||||
*
|
||||
|
|
|
@ -20,6 +20,7 @@ class NoopExemplarReservoir<T extends ExemplarData> implements ExemplarReservoir
|
|||
new NoopExemplarReservoir<>();
|
||||
static final NoopExemplarReservoir<DoubleExemplarData> DOUBLE_INSTANCE =
|
||||
new NoopExemplarReservoir<>();
|
||||
static final NoopExemplarReservoir<ExemplarData> ANY_INSTANCE = new NoopExemplarReservoir<>();
|
||||
|
||||
private NoopExemplarReservoir() {}
|
||||
|
||||
|
|
|
@ -53,9 +53,11 @@ public final class AsynchronousMetricStorage<T extends PointData, U extends Exem
|
|||
private final RegisteredReader registeredReader;
|
||||
private final MetricDescriptor metricDescriptor;
|
||||
private final AggregationTemporality aggregationTemporality;
|
||||
private final Aggregator<T, U> aggregator;
|
||||
private final AttributesProcessor attributesProcessor;
|
||||
private final MemoryMode memoryMode;
|
||||
private final Aggregator<T, U> originalAggregator;
|
||||
|
||||
private Aggregator<T, U> aggregator;
|
||||
|
||||
/**
|
||||
* This field is set to 1 less than the actual intended cardinality limit, allowing the last slot
|
||||
|
@ -100,6 +102,7 @@ public final class AsynchronousMetricStorage<T extends PointData, U extends Exem
|
|||
.getAggregationTemporality(metricDescriptor.getSourceInstrument().getType());
|
||||
this.memoryMode = registeredReader.getReader().getMemoryMode();
|
||||
this.aggregator = aggregator;
|
||||
this.originalAggregator = aggregator;
|
||||
this.attributesProcessor = attributesProcessor;
|
||||
this.maxCardinality = maxCardinality - 1;
|
||||
this.reusablePointsPool = new ObjectPool<>(aggregator::createReusablePoint);
|
||||
|
@ -309,6 +312,20 @@ public final class AsynchronousMetricStorage<T extends PointData, U extends Exem
|
|||
return currentPoints;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setEnabled(boolean enabled) {
|
||||
if (enabled) {
|
||||
if (aggregator == Aggregator.drop()) {
|
||||
aggregator = originalAggregator;
|
||||
}
|
||||
} else {
|
||||
aggregator = Aggregator.drop();
|
||||
|
||||
aggregatorHandles.clear();
|
||||
lastPoints.clear();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
return aggregator == Aggregator.drop();
|
||||
|
|
|
@ -32,6 +32,7 @@ import java.util.Queue;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
|
@ -51,8 +52,8 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
|
|||
private final RegisteredReader registeredReader;
|
||||
private final MetricDescriptor metricDescriptor;
|
||||
private final AggregationTemporality aggregationTemporality;
|
||||
private final Aggregator<T, U> aggregator;
|
||||
private volatile AggregatorHolder<T, U> aggregatorHolder = new AggregatorHolder<>();
|
||||
private final AtomicReference<AggregatorHolder<T, U>> aggregatorHolder;
|
||||
private final Aggregator<T, U> originalAggregator;
|
||||
private final AttributesProcessor attributesProcessor;
|
||||
|
||||
private final MemoryMode memoryMode;
|
||||
|
@ -82,11 +83,12 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
|
|||
int maxCardinality) {
|
||||
this.registeredReader = registeredReader;
|
||||
this.metricDescriptor = metricDescriptor;
|
||||
this.originalAggregator = aggregator;
|
||||
this.aggregatorHolder = new AtomicReference<>(new AggregatorHolder<>(aggregator));
|
||||
this.aggregationTemporality =
|
||||
registeredReader
|
||||
.getReader()
|
||||
.getAggregationTemporality(metricDescriptor.getSourceInstrument().getType());
|
||||
this.aggregator = aggregator;
|
||||
this.attributesProcessor = attributesProcessor;
|
||||
this.maxCardinality = maxCardinality - 1;
|
||||
this.memoryMode = registeredReader.getReader().getMemoryMode();
|
||||
|
@ -101,8 +103,7 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
|
|||
public void recordLong(long value, Attributes attributes, Context context) {
|
||||
AggregatorHolder<T, U> aggregatorHolder = getHolderForRecord();
|
||||
try {
|
||||
AggregatorHandle<T, U> handle =
|
||||
getAggregatorHandle(aggregatorHolder.aggregatorHandles, attributes, context);
|
||||
AggregatorHandle<T, U> handle = getAggregatorHandle(aggregatorHolder, attributes, context);
|
||||
handle.recordLong(value, attributes, context);
|
||||
} finally {
|
||||
releaseHolderForRecord(aggregatorHolder);
|
||||
|
@ -123,17 +124,32 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
|
|||
}
|
||||
AggregatorHolder<T, U> aggregatorHolder = getHolderForRecord();
|
||||
try {
|
||||
AggregatorHandle<T, U> handle =
|
||||
getAggregatorHandle(aggregatorHolder.aggregatorHandles, attributes, context);
|
||||
AggregatorHandle<T, U> handle = getAggregatorHandle(aggregatorHolder, attributes, context);
|
||||
handle.recordDouble(value, attributes, context);
|
||||
} finally {
|
||||
releaseHolderForRecord(aggregatorHolder);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setEnabled(boolean enabled) {
|
||||
if (enabled) {
|
||||
AggregatorHolder<T, U> localAggregatorHolder = Objects.requireNonNull(aggregatorHolder.get());
|
||||
if (localAggregatorHolder.aggregator == Aggregator.drop()) {
|
||||
AggregatorHolder<T, U> newHolder = new AggregatorHolder<>(originalAggregator);
|
||||
// If this fails, another thread called `setEnabled` and we can discard the current call
|
||||
if (aggregatorHolder.compareAndSet(localAggregatorHolder, newHolder)) {
|
||||
previousCollectionAggregatorHandles.clear();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
aggregatorHolder.set(new AggregatorHolder<>(Aggregator.drop()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEnabled() {
|
||||
return true;
|
||||
return Objects.requireNonNull(this.aggregatorHolder.get()).aggregator != Aggregator.drop();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -146,14 +162,15 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
|
|||
*/
|
||||
private AggregatorHolder<T, U> getHolderForRecord() {
|
||||
do {
|
||||
AggregatorHolder<T, U> aggregatorHolder = this.aggregatorHolder;
|
||||
int recordsInProgress = aggregatorHolder.activeRecordingThreads.addAndGet(2);
|
||||
AggregatorHolder<T, U> localAggregatorHolder =
|
||||
Objects.requireNonNull(this.aggregatorHolder.get());
|
||||
int recordsInProgress = localAggregatorHolder.activeRecordingThreads.addAndGet(2);
|
||||
if (recordsInProgress % 2 == 0) {
|
||||
return aggregatorHolder;
|
||||
return localAggregatorHolder;
|
||||
} else {
|
||||
// Collect is in progress, decrement recordsInProgress to allow collect to proceed and
|
||||
// re-read aggregatorHolder
|
||||
aggregatorHolder.activeRecordingThreads.addAndGet(-2);
|
||||
localAggregatorHolder.activeRecordingThreads.addAndGet(-2);
|
||||
}
|
||||
} while (true);
|
||||
}
|
||||
|
@ -167,16 +184,14 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
|
|||
}
|
||||
|
||||
private AggregatorHandle<T, U> getAggregatorHandle(
|
||||
ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles,
|
||||
Attributes attributes,
|
||||
Context context) {
|
||||
AggregatorHolder<T, U> localAggregatorHolder, Attributes attributes, Context context) {
|
||||
Objects.requireNonNull(attributes, "attributes");
|
||||
attributes = attributesProcessor.process(attributes, context);
|
||||
AggregatorHandle<T, U> handle = aggregatorHandles.get(attributes);
|
||||
AggregatorHandle<T, U> handle = localAggregatorHolder.aggregatorHandles.get(attributes);
|
||||
if (handle != null) {
|
||||
return handle;
|
||||
}
|
||||
if (aggregatorHandles.size() >= maxCardinality) {
|
||||
if (localAggregatorHolder.aggregatorHandles.size() >= maxCardinality) {
|
||||
logger.log(
|
||||
Level.WARNING,
|
||||
"Instrument "
|
||||
|
@ -186,7 +201,7 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
|
|||
+ ").");
|
||||
// Return handle for overflow series, first checking if a handle already exists for it
|
||||
attributes = MetricStorage.CARDINALITY_OVERFLOW;
|
||||
handle = aggregatorHandles.get(attributes);
|
||||
handle = localAggregatorHolder.aggregatorHandles.get(attributes);
|
||||
if (handle != null) {
|
||||
return handle;
|
||||
}
|
||||
|
@ -194,9 +209,9 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
|
|||
// Get handle from pool if available, else create a new one.
|
||||
AggregatorHandle<T, U> newHandle = aggregatorHandlePool.poll();
|
||||
if (newHandle == null) {
|
||||
newHandle = aggregator.createHandle();
|
||||
newHandle = localAggregatorHolder.aggregator.createHandle();
|
||||
}
|
||||
handle = aggregatorHandles.putIfAbsent(attributes, newHandle);
|
||||
handle = localAggregatorHolder.aggregatorHandles.putIfAbsent(attributes, newHandle);
|
||||
return handle != null ? handle : newHandle;
|
||||
}
|
||||
|
||||
|
@ -211,14 +226,16 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
|
|||
aggregationTemporality == DELTA
|
||||
? registeredReader.getLastCollectEpochNanos()
|
||||
: startEpochNanos;
|
||||
AggregatorHolder<T, U> holder = Objects.requireNonNull(this.aggregatorHolder.get());
|
||||
|
||||
ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles;
|
||||
if (reset) {
|
||||
AggregatorHolder<T, U> holder = this.aggregatorHolder;
|
||||
this.aggregatorHolder =
|
||||
AggregatorHolder<T, U> newHolder =
|
||||
(memoryMode == REUSABLE_DATA)
|
||||
? new AggregatorHolder<>(previousCollectionAggregatorHandles)
|
||||
: new AggregatorHolder<>();
|
||||
? new AggregatorHolder<>(holder.aggregator, previousCollectionAggregatorHandles)
|
||||
: new AggregatorHolder<>(holder.aggregator);
|
||||
// If this fails, another thread called `setEnabled` and we can discard the current call
|
||||
aggregatorHolder.compareAndSet(holder, newHolder);
|
||||
|
||||
// Increment recordsInProgress by 1, which produces an odd number acting as a signal that
|
||||
// record operations should re-read the volatile this.aggregatorHolder.
|
||||
|
@ -228,10 +245,8 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
|
|||
while (recordsInProgress > 1) {
|
||||
recordsInProgress = holder.activeRecordingThreads.get();
|
||||
}
|
||||
aggregatorHandles = holder.aggregatorHandles;
|
||||
} else {
|
||||
aggregatorHandles = this.aggregatorHolder.aggregatorHandles;
|
||||
}
|
||||
aggregatorHandles = holder.aggregatorHandles;
|
||||
|
||||
List<T> points;
|
||||
if (memoryMode == REUSABLE_DATA) {
|
||||
|
@ -303,7 +318,7 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
|
|||
return EmptyMetricData.getInstance();
|
||||
}
|
||||
|
||||
return aggregator.toMetricData(
|
||||
return holder.aggregator.toMetricData(
|
||||
resource, instrumentationScopeInfo, metricDescriptor, points, aggregationTemporality);
|
||||
}
|
||||
|
||||
|
@ -313,6 +328,7 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
|
|||
}
|
||||
|
||||
private static class AggregatorHolder<T extends PointData, U extends ExemplarData> {
|
||||
private final Aggregator<T, U> aggregator;
|
||||
private final ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles;
|
||||
// Recording threads grab the current interval (AggregatorHolder) and atomically increment
|
||||
// this by 2 before recording against it (and then decrement by two when done).
|
||||
|
@ -330,12 +346,14 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
|
|||
// and then grab and record against the new current interval (AggregatorHolder).
|
||||
private final AtomicInteger activeRecordingThreads = new AtomicInteger(0);
|
||||
|
||||
private AggregatorHolder() {
|
||||
aggregatorHandles = new ConcurrentHashMap<>();
|
||||
private AggregatorHolder(Aggregator<T, U> aggregator) {
|
||||
this(aggregator, new ConcurrentHashMap<>());
|
||||
}
|
||||
|
||||
private AggregatorHolder(
|
||||
Aggregator<T, U> aggregator,
|
||||
ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles) {
|
||||
this.aggregator = aggregator;
|
||||
this.aggregatorHandles = aggregatorHandles;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,4 +44,9 @@ final class EmptyMetricStorage implements SynchronousMetricStorage {
|
|||
public boolean isEnabled() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setEnabled(boolean enabled) {
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
|
|
|
@ -58,4 +58,6 @@ public interface MetricStorage {
|
|||
default boolean isEmpty() {
|
||||
return this == EmptyMetricStorage.INSTANCE;
|
||||
}
|
||||
|
||||
void setEnabled(boolean enabled);
|
||||
}
|
||||
|
|
|
@ -28,8 +28,10 @@ import io.opentelemetry.context.Context;
|
|||
import io.opentelemetry.context.Scope;
|
||||
import io.opentelemetry.sdk.common.CompletableResultCode;
|
||||
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
|
||||
import io.opentelemetry.sdk.internal.ScopeConfigurator;
|
||||
import io.opentelemetry.sdk.metrics.data.MetricData;
|
||||
import io.opentelemetry.sdk.metrics.export.MetricReader;
|
||||
import io.opentelemetry.sdk.metrics.internal.MeterConfig;
|
||||
import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
|
||||
import io.opentelemetry.sdk.metrics.internal.view.ViewRegistry;
|
||||
import io.opentelemetry.sdk.resources.Resource;
|
||||
|
@ -47,6 +49,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Consumer;
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
|
@ -1030,6 +1033,34 @@ class SdkMeterProviderTest {
|
|||
sum -> sum.isCumulative().hasPointsSatisfying(point -> point.hasValue(1))));
|
||||
}
|
||||
|
||||
private static ScopeConfigurator<MeterConfig> flipConfigurator(boolean enabled) {
|
||||
return scopeInfo -> enabled ? MeterConfig.disabled() : MeterConfig.enabled();
|
||||
}
|
||||
|
||||
@Test
|
||||
void propagatesEnablementToMeterDirectly() {
|
||||
SdkMeterProvider meterProvider =
|
||||
SdkMeterProvider.builder().registerMetricReader(InMemoryMetricReader.create()).build();
|
||||
SdkMeter meter = (SdkMeter) meterProvider.get("test");
|
||||
boolean isEnabled = meter.isMeterEnabled();
|
||||
|
||||
meterProvider.setMeterConfigurator(flipConfigurator(isEnabled));
|
||||
|
||||
Assertions.assertThat(meter.isMeterEnabled()).isEqualTo(!isEnabled);
|
||||
}
|
||||
|
||||
@Test
|
||||
void propagatesEnablementToMeterByUtil() {
|
||||
SdkMeterProvider sdkMeterProvider =
|
||||
SdkMeterProvider.builder().registerMetricReader(InMemoryMetricReader.create()).build();
|
||||
SdkMeter sdkMeter = (SdkMeter) sdkMeterProvider.get("test");
|
||||
boolean isEnabled = sdkMeter.isMeterEnabled();
|
||||
|
||||
SdkMeterProviderUtil.setMeterConfigurator(sdkMeterProvider, flipConfigurator(isEnabled));
|
||||
|
||||
Assertions.assertThat(sdkMeter.isMeterEnabled()).isEqualTo(!isEnabled);
|
||||
}
|
||||
|
||||
private static void registerViewForAllTypes(
|
||||
SdkMeterProviderBuilder meterProviderBuilder, Aggregation aggregation) {
|
||||
for (InstrumentType instrumentType : InstrumentType.values()) {
|
||||
|
|
|
@ -18,6 +18,7 @@ import io.opentelemetry.api.metrics.LongUpDownCounter;
|
|||
import io.opentelemetry.api.metrics.Meter;
|
||||
import io.opentelemetry.api.metrics.MeterProvider;
|
||||
import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
|
||||
import io.opentelemetry.sdk.metrics.internal.MeterConfig;
|
||||
import io.opentelemetry.sdk.metrics.internal.state.MetricStorageRegistry;
|
||||
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
|
||||
import java.util.Locale;
|
||||
|
@ -481,4 +482,16 @@ class SdkMeterTest {
|
|||
+ "attributes={}"
|
||||
+ "}}");
|
||||
}
|
||||
|
||||
@Test
|
||||
void updateEnabled() {
|
||||
SdkMeterProvider sdkMeterProvider =
|
||||
SdkMeterProvider.builder().registerMetricReader(InMemoryMetricReader.create()).build();
|
||||
SdkMeter meter = (SdkMeter) sdkMeterProvider.get("test");
|
||||
|
||||
meter.updateMeterConfig(MeterConfig.disabled());
|
||||
assertThat(meter.isMeterEnabled()).isFalse();
|
||||
meter.updateMeterConfig(MeterConfig.enabled());
|
||||
assertThat(meter.isMeterEnabled()).isTrue();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -456,4 +456,107 @@ class AsynchronousMetricStorageTest {
|
|||
.anySatisfy(point -> assertThat(point).isSameAs(firstCollectionPoint));
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(MemoryMode.class)
|
||||
void enabledThenDisable_isEmpty(MemoryMode memoryMode) {
|
||||
setup(memoryMode);
|
||||
|
||||
longCounterStorage.setEnabled(false);
|
||||
|
||||
assertThat(longCounterStorage.isEmpty()).isTrue();
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(MemoryMode.class)
|
||||
void enabledThenDisableThenEnable_isEmpty(MemoryMode memoryMode) {
|
||||
setup(memoryMode);
|
||||
|
||||
longCounterStorage.setEnabled(false);
|
||||
longCounterStorage.setEnabled(true);
|
||||
|
||||
assertThat(longCounterStorage.isEmpty()).isFalse();
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(MemoryMode.class)
|
||||
void enabledThenDisable_recordAndCollect(MemoryMode memoryMode) {
|
||||
setup(memoryMode);
|
||||
|
||||
longCounterStorage.setEnabled(false);
|
||||
|
||||
longCounterStorage.record(Attributes.empty(), 10);
|
||||
|
||||
assertThat(longCounterStorage.collect(resource, scope, 0, 0).isEmpty()).isTrue();
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(MemoryMode.class)
|
||||
void enabledThenDisableThenEnable_recordAndCollect(MemoryMode memoryMode) {
|
||||
setup(memoryMode);
|
||||
|
||||
longCounterStorage.setEnabled(false);
|
||||
longCounterStorage.setEnabled(true);
|
||||
|
||||
longCounterStorage.record(Attributes.empty(), 10);
|
||||
|
||||
assertThat(longCounterStorage.collect(resource, scope, 0, 0).isEmpty()).isFalse();
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(MemoryMode.class)
|
||||
void disableDropsAggregatorState(MemoryMode memoryMode) {
|
||||
setup(memoryMode);
|
||||
|
||||
longCounterStorage.record(Attributes.empty(), 10);
|
||||
|
||||
longCounterStorage.setEnabled(false);
|
||||
longCounterStorage.setEnabled(true);
|
||||
|
||||
longCounterStorage.record(Attributes.empty(), 5);
|
||||
|
||||
MetricData metricData = longCounterStorage.collect(resource, scope, 0, 0);
|
||||
assertThat(metricData)
|
||||
.hasLongSumSatisfying(
|
||||
sum ->
|
||||
sum.satisfies(
|
||||
sumData ->
|
||||
assertThat(sumData.getPoints()).allMatch(point -> point.getValue() == 5)));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(MemoryMode.class)
|
||||
void collect_DeltaResetAfterDisabled(MemoryMode memoryMode) {
|
||||
setup(memoryMode);
|
||||
|
||||
when(reader.getAggregationTemporality(any())).thenReturn(AggregationTemporality.DELTA);
|
||||
longCounterStorage =
|
||||
AsynchronousMetricStorage.create(
|
||||
registeredReader,
|
||||
registeredView,
|
||||
InstrumentDescriptor.create(
|
||||
"long-counter",
|
||||
"description",
|
||||
"unit",
|
||||
InstrumentType.COUNTER,
|
||||
InstrumentValueType.LONG,
|
||||
Advice.empty()));
|
||||
|
||||
longCounterStorage.setEpochInformation(0, 10);
|
||||
longCounterStorage.record(Attributes.empty(), 5);
|
||||
longCounterStorage.collect(resource, scope, 0, 0);
|
||||
|
||||
longCounterStorage.setEnabled(false);
|
||||
longCounterStorage.setEnabled(true);
|
||||
|
||||
longCounterStorage.setEpochInformation(0, 30);
|
||||
longCounterStorage.record(Attributes.empty(), 4);
|
||||
MetricData metricData = longCounterStorage.collect(resource, scope, 0, 0);
|
||||
assertThat(metricData)
|
||||
.hasLongSumSatisfying(
|
||||
sum ->
|
||||
sum.satisfies(
|
||||
sumData ->
|
||||
assertThat(sumData.getPoints()).allMatch(point -> point.getValue() == 4)));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -119,5 +119,10 @@ class MetricStorageRegistryTest {
|
|||
public boolean isEnabled() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setEnabled(boolean enabled) {
|
||||
throw new UnsupportedOperationException("Not implemented");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -835,4 +835,117 @@ public class SynchronousMetricStorageTest {
|
|||
|
||||
return argumentsList.stream();
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(MemoryMode.class)
|
||||
void enabledThenDisable_isEnabled(MemoryMode memoryMode) {
|
||||
initialize(memoryMode);
|
||||
|
||||
DefaultSynchronousMetricStorage<?, ?> storage =
|
||||
new DefaultSynchronousMetricStorage<>(
|
||||
deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor, CARDINALITY_LIMIT);
|
||||
|
||||
storage.setEnabled(false);
|
||||
|
||||
assertThat(storage.isEnabled()).isFalse();
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(MemoryMode.class)
|
||||
void enabledThenDisableThenEnable_isEnabled(MemoryMode memoryMode) {
|
||||
initialize(memoryMode);
|
||||
|
||||
DefaultSynchronousMetricStorage<?, ?> storage =
|
||||
new DefaultSynchronousMetricStorage<>(
|
||||
deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor, CARDINALITY_LIMIT);
|
||||
|
||||
storage.setEnabled(false);
|
||||
storage.setEnabled(true);
|
||||
|
||||
assertThat(storage.isEnabled()).isTrue();
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(MemoryMode.class)
|
||||
void enabledThenDisable_recordAndCollect(MemoryMode memoryMode) {
|
||||
initialize(memoryMode);
|
||||
|
||||
DefaultSynchronousMetricStorage<?, ?> storage =
|
||||
new DefaultSynchronousMetricStorage<>(
|
||||
deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor, CARDINALITY_LIMIT);
|
||||
|
||||
storage.setEnabled(false);
|
||||
|
||||
storage.recordDouble(10d, Attributes.empty(), Context.current());
|
||||
|
||||
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10).isEmpty()).isTrue();
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(MemoryMode.class)
|
||||
void enabledThenDisableThenEnable_recordAndCollect(MemoryMode memoryMode) {
|
||||
initialize(memoryMode);
|
||||
|
||||
DefaultSynchronousMetricStorage<?, ?> storage =
|
||||
new DefaultSynchronousMetricStorage<>(
|
||||
deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor, CARDINALITY_LIMIT);
|
||||
|
||||
storage.setEnabled(false);
|
||||
storage.setEnabled(true);
|
||||
|
||||
storage.recordDouble(10d, Attributes.empty(), Context.current());
|
||||
|
||||
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10).isEmpty()).isFalse();
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(MemoryMode.class)
|
||||
void disableDropsAggregatorState(MemoryMode memoryMode) {
|
||||
initialize(memoryMode);
|
||||
|
||||
DefaultSynchronousMetricStorage<?, ?> storage =
|
||||
new DefaultSynchronousMetricStorage<>(
|
||||
deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor, CARDINALITY_LIMIT);
|
||||
|
||||
storage.recordDouble(10d, Attributes.empty(), Context.current());
|
||||
|
||||
storage.setEnabled(false);
|
||||
storage.setEnabled(true);
|
||||
|
||||
storage.recordDouble(5d, Attributes.empty(), Context.current());
|
||||
|
||||
MetricData metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10);
|
||||
assertThat(metricData)
|
||||
.hasDoubleSumSatisfying(
|
||||
sum ->
|
||||
sum.satisfies(
|
||||
sumData ->
|
||||
assertThat(sumData.getPoints()).allMatch(point -> point.getValue() == 5d)));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(MemoryMode.class)
|
||||
void collect_DeltaResetAfterDisabled(MemoryMode memoryMode) {
|
||||
initialize(memoryMode);
|
||||
|
||||
DefaultSynchronousMetricStorage<?, ?> storage =
|
||||
new DefaultSynchronousMetricStorage<>(
|
||||
deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor, CARDINALITY_LIMIT);
|
||||
|
||||
storage.recordDouble(5d, Attributes.empty(), Context.current());
|
||||
storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10);
|
||||
deltaReader.setLastCollectEpochNanos(10);
|
||||
|
||||
storage.setEnabled(false);
|
||||
storage.setEnabled(true);
|
||||
|
||||
storage.recordDouble(4d, Attributes.empty(), Context.current());
|
||||
MetricData metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 30);
|
||||
assertThat(metricData)
|
||||
.hasDoubleSumSatisfying(
|
||||
sum ->
|
||||
sum.satisfies(
|
||||
sumData ->
|
||||
assertThat(sumData.getPoints()).allMatch(point -> point.getValue() == 4d)));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -240,4 +240,68 @@ class MeterConfigTest {
|
|||
Arguments.of(enableStartsWithD, scopeDog, enabled()),
|
||||
Arguments.of(enableStartsWithD, scopeDuck, enabled()));
|
||||
}
|
||||
|
||||
@Test
|
||||
void setScopeConfigurator() {
|
||||
// 1. Initially, configure all meters to be enabled except meterB
|
||||
InMemoryMetricReader reader = InMemoryMetricReader.create();
|
||||
SdkMeterProvider meterProvider =
|
||||
SdkMeterProvider.builder()
|
||||
.addMeterConfiguratorCondition(nameEquals("meterB"), disabled())
|
||||
.registerMetricReader(reader)
|
||||
.build();
|
||||
|
||||
SdkMeter meterA = (SdkMeter) meterProvider.get("meterA");
|
||||
SdkMeter meterB = (SdkMeter) meterProvider.get("meterB");
|
||||
SdkMeter meterC = (SdkMeter) meterProvider.get("meterC");
|
||||
|
||||
// verify isMeterEnabled()
|
||||
assertThat(meterA.isMeterEnabled()).isTrue();
|
||||
assertThat(meterB.isMeterEnabled()).isFalse();
|
||||
assertThat(meterC.isMeterEnabled()).isTrue();
|
||||
|
||||
// verify metrics are emitted as expected
|
||||
meterA.counterBuilder("meterA").build().add(1);
|
||||
meterB.counterBuilder("meterB").build().add(2);
|
||||
meterC.counterBuilder("meterC").build().add(3);
|
||||
assertThat(reader.collectAllMetrics())
|
||||
.satisfiesExactlyInAnyOrder(
|
||||
metricData -> assertThat(metricData).hasName("meterA"),
|
||||
metricData -> assertThat(metricData).hasName("meterC"));
|
||||
|
||||
// 2. Update config to disable all meters
|
||||
meterProvider.setMeterConfigurator(
|
||||
ScopeConfigurator.<MeterConfig>builder().setDefault(disabled()).build());
|
||||
|
||||
// verify isEnabled()
|
||||
assertThat(meterA.isMeterEnabled()).isFalse();
|
||||
assertThat(meterB.isMeterEnabled()).isFalse();
|
||||
assertThat(meterC.isMeterEnabled()).isFalse();
|
||||
|
||||
// verify metrics are emitted as expected
|
||||
meterA.counterBuilder("meterA").build().add(1);
|
||||
meterB.counterBuilder("meterB").build().add(2);
|
||||
meterC.counterBuilder("meterC").build().add(3);
|
||||
assertThat(reader.collectAllMetrics()).isEmpty();
|
||||
|
||||
// 3. Update config to restore original
|
||||
meterProvider.setMeterConfigurator(
|
||||
ScopeConfigurator.<MeterConfig>builder()
|
||||
.addCondition(nameEquals("meterB"), disabled())
|
||||
.build());
|
||||
|
||||
// verify isEnabled()
|
||||
assertThat(meterA.isMeterEnabled()).isTrue();
|
||||
assertThat(meterB.isMeterEnabled()).isFalse();
|
||||
assertThat(meterC.isMeterEnabled()).isTrue();
|
||||
|
||||
// verify metrics are emitted as expected
|
||||
meterA.counterBuilder("meterA").build().add(1);
|
||||
meterB.counterBuilder("meterB").build().add(2);
|
||||
meterC.counterBuilder("meterC").build().add(3);
|
||||
assertThat(reader.collectAllMetrics())
|
||||
.satisfiesExactlyInAnyOrder(
|
||||
metricData -> assertThat(metricData).hasName("meterA"),
|
||||
metricData -> assertThat(metricData).hasName("meterC"));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue