From a202a215ae7b3614edf1448487539793659aebef Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Mon, 12 May 2025 22:42:18 +0200 Subject: [PATCH 01/32] swap handle async --- .../state/AsynchronousMetricStorage.java | 48 +++++++++++++------ 1 file changed, 34 insertions(+), 14 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index ad68fdd3ec..675866fe20 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -53,7 +53,6 @@ public final class AsynchronousMetricStorage aggregator; private final AttributesProcessor attributesProcessor; private final MemoryMode memoryMode; @@ -71,9 +70,7 @@ public final class AsynchronousMetricStorage reusablePointsPool; - private final ObjectPool> reusableHandlesPool; private final Function> handleBuilder; - private final BiConsumer> handleReleaser; private final BiConsumer pointReleaser; private final List reusablePointsList = new ArrayList<>(); @@ -81,6 +78,10 @@ public final class AsynchronousMetricStorage reusablePointsMap = new PooledHashMap<>(); + // deliberately not volatile because of performance concerns + // - which means its eventually consistent + private AggregatorHolder aggregatorHolder; + // Time information relative to recording of data in aggregatorHandles, set while calling // callbacks private long startEpochNanos; @@ -99,15 +100,14 @@ public final class AsynchronousMetricStorage(aggregator::createReusablePoint); - this.reusableHandlesPool = new ObjectPool<>(aggregator::createHandle); - this.handleBuilder = ignored -> reusableHandlesPool.borrowObject(); - this.handleReleaser = (ignored, handle) -> reusableHandlesPool.returnObject(handle); this.pointReleaser = (ignored, point) -> reusablePointsPool.returnObject(point); + this.aggregatorHolder = new AggregatorHolder(aggregator); + this.handleBuilder = ignored -> aggregatorHolder.reusableHandlesPool.borrowObject(); + if (memoryMode == REUSABLE_DATA) { this.lastPoints = new PooledHashMap<>(); this.aggregatorHandles = new PooledHashMap<>(); @@ -143,6 +143,10 @@ public final class AsynchronousMetricStorage aggregator) { + this.aggregatorHolder = new AggregatorHolder(aggregator); + } + /** Record callback measurement from {@link ObservableLongMeasurement}. */ void record(Attributes attributes, long value) { attributes = validateAndProcessAttributes(attributes); @@ -198,20 +202,24 @@ public final class AsynchronousMetricStorage result = aggregationTemporality == AggregationTemporality.DELTA ? collectWithDeltaAggregationTemporality() : collectWithCumulativeAggregationTemporality(); // collectWith*AggregationTemporality() methods are responsible for resetting the handle - aggregatorHandles.forEach(handleReleaser); + aggregatorHandles.forEach(localAggregatorHolder.handleReleaser); aggregatorHandles.clear(); - return aggregator.toMetricData( + return localAggregatorHolder.aggregator.toMetricData( resource, instrumentationScopeInfo, metricDescriptor, result, aggregationTemporality); } private Collection collectWithDeltaAggregationTemporality() { + AggregatorHolder localAggregatorHolder = aggregatorHolder; + Map currentPoints; if (memoryMode == REUSABLE_DATA) { // deltaPoints computed in the previous collection can be released @@ -234,7 +242,7 @@ public final class AsynchronousMetricStorage aggregator; + private final ObjectPool> reusableHandlesPool; + private final BiConsumer> handleReleaser; + + private AggregatorHolder(Aggregator aggregator) { + this.aggregator = aggregator; + this.reusableHandlesPool = new ObjectPool<>(aggregator::createHandle); + this.handleReleaser = (ignored, handle) -> reusableHandlesPool.returnObject(handle); + } } } From c83804d53c538a5f50f7f2d4c2224d913d0e25ba Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Mon, 12 May 2025 23:01:20 +0200 Subject: [PATCH 02/32] static --- .../internal/state/AsynchronousMetricStorage.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index 675866fe20..27863c6496 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -80,7 +80,7 @@ public final class AsynchronousMetricStorage aggregatorHolder; // Time information relative to recording of data in aggregatorHandles, set while calling // callbacks @@ -105,7 +105,7 @@ public final class AsynchronousMetricStorage(aggregator::createReusablePoint); this.pointReleaser = (ignored, point) -> reusablePointsPool.returnObject(point); - this.aggregatorHolder = new AggregatorHolder(aggregator); + this.aggregatorHolder = new AggregatorHolder<>(aggregator); this.handleBuilder = ignored -> aggregatorHolder.reusableHandlesPool.borrowObject(); if (memoryMode == REUSABLE_DATA) { @@ -144,7 +144,7 @@ public final class AsynchronousMetricStorage aggregator) { - this.aggregatorHolder = new AggregatorHolder(aggregator); + this.aggregatorHolder = new AggregatorHolder<>(aggregator); } /** Record callback measurement from {@link ObservableLongMeasurement}. */ @@ -202,7 +202,7 @@ public final class AsynchronousMetricStorage localAggregatorHolder = aggregatorHolder; Collection result = aggregationTemporality == AggregationTemporality.DELTA @@ -218,7 +218,7 @@ public final class AsynchronousMetricStorage collectWithDeltaAggregationTemporality() { - AggregatorHolder localAggregatorHolder = aggregatorHolder; + AggregatorHolder localAggregatorHolder = aggregatorHolder; Map currentPoints; if (memoryMode == REUSABLE_DATA) { @@ -322,7 +322,7 @@ public final class AsynchronousMetricStorage { private final Aggregator aggregator; private final ObjectPool> reusableHandlesPool; private final BiConsumer> handleReleaser; From f4a40932f44dcc35ec3792eb74ad51d242976e4d Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Mon, 12 May 2025 23:09:40 +0200 Subject: [PATCH 03/32] atomicref --- .../DefaultSynchronousMetricStorage.java | 65 ++++++++++--------- 1 file changed, 34 insertions(+), 31 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index c0deda9d06..b34d140b34 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -32,6 +32,7 @@ import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -51,8 +52,7 @@ public final class DefaultSynchronousMetricStorage aggregator; - private volatile AggregatorHolder aggregatorHolder = new AggregatorHolder<>(); + private final AtomicReference> aggregatorHolder; private final AttributesProcessor attributesProcessor; private final MemoryMode memoryMode; @@ -82,16 +82,20 @@ public final class DefaultSynchronousMetricStorage(new AggregatorHolder<>(aggregator)); this.aggregationTemporality = registeredReader .getReader() .getAggregationTemporality(metricDescriptor.getSourceInstrument().getType()); - this.aggregator = aggregator; this.attributesProcessor = attributesProcessor; this.maxCardinality = maxCardinality - 1; this.memoryMode = registeredReader.getReader().getMemoryMode(); } + void swapAggregator(Aggregator aggregator) { + this.aggregatorHolder.set(new AggregatorHolder<>(aggregator)); + } + // Visible for testing Queue> getAggregatorHandlePool() { return aggregatorHandlePool; @@ -101,8 +105,7 @@ public final class DefaultSynchronousMetricStorage aggregatorHolder = getHolderForRecord(); try { - AggregatorHandle handle = - getAggregatorHandle(aggregatorHolder.aggregatorHandles, attributes, context); + AggregatorHandle handle = getAggregatorHandle(aggregatorHolder, attributes, context); handle.recordLong(value, attributes, context); } finally { releaseHolderForRecord(aggregatorHolder); @@ -123,8 +126,7 @@ public final class DefaultSynchronousMetricStorage aggregatorHolder = getHolderForRecord(); try { - AggregatorHandle handle = - getAggregatorHandle(aggregatorHolder.aggregatorHandles, attributes, context); + AggregatorHandle handle = getAggregatorHandle(aggregatorHolder, attributes, context); handle.recordDouble(value, attributes, context); } finally { releaseHolderForRecord(aggregatorHolder); @@ -146,14 +148,14 @@ public final class DefaultSynchronousMetricStorage getHolderForRecord() { do { - AggregatorHolder aggregatorHolder = this.aggregatorHolder; - int recordsInProgress = aggregatorHolder.activeRecordingThreads.addAndGet(2); + AggregatorHolder localAggregatorHolder = this.aggregatorHolder.get(); + int recordsInProgress = localAggregatorHolder.activeRecordingThreads.addAndGet(2); if (recordsInProgress % 2 == 0) { - return aggregatorHolder; + return localAggregatorHolder; } else { // Collect is in progress, decrement recordsInProgress to allow collect to proceed and // re-read aggregatorHolder - aggregatorHolder.activeRecordingThreads.addAndGet(-2); + localAggregatorHolder.activeRecordingThreads.addAndGet(-2); } } while (true); } @@ -162,21 +164,19 @@ public final class DefaultSynchronousMetricStorage aggregatorHolder) { - aggregatorHolder.activeRecordingThreads.addAndGet(-2); + private void releaseHolderForRecord(AggregatorHolder localAggregatorHolder) { + localAggregatorHolder.activeRecordingThreads.addAndGet(-2); } private AggregatorHandle getAggregatorHandle( - ConcurrentHashMap> aggregatorHandles, - Attributes attributes, - Context context) { + AggregatorHolder localAggregatorHolder, Attributes attributes, Context context) { Objects.requireNonNull(attributes, "attributes"); attributes = attributesProcessor.process(attributes, context); - AggregatorHandle handle = aggregatorHandles.get(attributes); + AggregatorHandle handle = localAggregatorHolder.aggregatorHandles.get(attributes); if (handle != null) { return handle; } - if (aggregatorHandles.size() >= maxCardinality) { + if (localAggregatorHolder.aggregatorHandles.size() >= maxCardinality) { logger.log( Level.WARNING, "Instrument " @@ -186,7 +186,7 @@ public final class DefaultSynchronousMetricStorage newHandle = aggregatorHandlePool.poll(); if (newHandle == null) { - newHandle = aggregator.createHandle(); + newHandle = localAggregatorHolder.aggregator.createHandle(); } - handle = aggregatorHandles.putIfAbsent(attributes, newHandle); + handle = localAggregatorHolder.aggregatorHandles.putIfAbsent(attributes, newHandle); return handle != null ? handle : newHandle; } @@ -211,14 +211,16 @@ public final class DefaultSynchronousMetricStorage holder = this.aggregatorHolder.get(); ConcurrentHashMap> aggregatorHandles; if (reset) { - AggregatorHolder holder = this.aggregatorHolder; - this.aggregatorHolder = + AggregatorHolder newHolder = (memoryMode == REUSABLE_DATA) - ? new AggregatorHolder<>(previousCollectionAggregatorHandles) - : new AggregatorHolder<>(); + ? new AggregatorHolder<>(holder.aggregator, previousCollectionAggregatorHandles) + : new AggregatorHolder<>(holder.aggregator); + // Otherwise, swapAggregator was called and the update should be ignored + aggregatorHolder.compareAndSet(holder, newHolder); // Increment recordsInProgress by 1, which produces an odd number acting as a signal that // record operations should re-read the volatile this.aggregatorHolder. @@ -228,10 +230,8 @@ public final class DefaultSynchronousMetricStorage 1) { recordsInProgress = holder.activeRecordingThreads.get(); } - aggregatorHandles = holder.aggregatorHandles; - } else { - aggregatorHandles = this.aggregatorHolder.aggregatorHandles; } + aggregatorHandles = holder.aggregatorHandles; List points; if (memoryMode == REUSABLE_DATA) { @@ -303,7 +303,7 @@ public final class DefaultSynchronousMetricStorage { + private final Aggregator aggregator; private final ConcurrentHashMap> aggregatorHandles; // Recording threads grab the current interval (AggregatorHolder) and atomically increment // this by 2 before recording against it (and then decrement by two when done). @@ -330,12 +331,14 @@ public final class DefaultSynchronousMetricStorage(); + private AggregatorHolder(Aggregator aggregator) { + this(aggregator, new ConcurrentHashMap<>()); } private AggregatorHolder( + Aggregator aggregator, ConcurrentHashMap> aggregatorHandles) { + this.aggregator = aggregator; this.aggregatorHandles = aggregatorHandles; } } From 7039b427797b7026ef6238d433ea0a7f63ba89d3 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Mon, 12 May 2025 23:13:08 +0200 Subject: [PATCH 04/32] set meter config --- .../java/io/opentelemetry/sdk/metrics/SdkMeter.java | 7 ++++++- .../opentelemetry/sdk/metrics/SdkMeterProvider.java | 12 +++++++++++- .../sdk/metrics/internal/SdkMeterProviderUtil.java | 13 +++++++++++++ 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java index c0f1476077..71a907f7ff 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java @@ -88,7 +88,8 @@ final class SdkMeter implements Meter { private final MeterProviderSharedState meterProviderSharedState; private final InstrumentationScopeInfo instrumentationScopeInfo; private final Map readerStorageRegistries; - private final boolean meterEnabled; + + private boolean meterEnabled; SdkMeter( MeterProviderSharedState meterProviderSharedState, @@ -103,6 +104,10 @@ final class SdkMeter implements Meter { this.meterEnabled = meterConfig.isEnabled(); } + void updateMeterConfig(MeterConfig meterConfig) { + meterEnabled = meterConfig.isEnabled(); + } + // Visible for testing InstrumentationScopeInfo getInstrumentationScopeInfo() { return instrumentationScopeInfo; diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProvider.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProvider.java index 30ae0b1da5..a143aa0ddb 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProvider.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProvider.java @@ -52,9 +52,19 @@ public final class SdkMeterProvider implements MeterProvider, Closeable { private final List metricProducers; private final MeterProviderSharedState sharedState; private final ComponentRegistry registry; - private final ScopeConfigurator meterConfigurator; private final AtomicBoolean isClosed = new AtomicBoolean(false); + private ScopeConfigurator meterConfigurator; + + void setMeterConfigurator(ScopeConfigurator meterConfigurator) { + this.meterConfigurator = meterConfigurator; + this.registry + .getComponents() + .forEach( + sdkMeter -> + sdkMeter.updateMeterConfig(getMeterConfig(sdkMeter.getInstrumentationScopeInfo()))); + } + /** Returns a new {@link SdkMeterProviderBuilder} for {@link SdkMeterProvider}. */ public static SdkMeterProviderBuilder builder() { return new SdkMeterProviderBuilder(); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/SdkMeterProviderUtil.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/SdkMeterProviderUtil.java index 9fc690366e..c324900ab7 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/SdkMeterProviderUtil.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/SdkMeterProviderUtil.java @@ -49,6 +49,19 @@ public final class SdkMeterProviderUtil { return sdkMeterProviderBuilder; } + /** Reflectively set the {@link ScopeConfigurator} to the {@link SdkMeterProvider}. */ + public static void setMeterConfigurator( + SdkMeterProvider sdkMeterProvider, ScopeConfigurator scopeConfigurator) { + try { + Method method = + SdkMeterProvider.class.getDeclaredMethod("setMeterConfigurator", ScopeConfigurator.class); + method.setAccessible(true); + method.invoke(sdkMeterProvider, scopeConfigurator); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + throw new IllegalStateException("Error calling setMeterConfigurator on SdkMeterProvider", e); + } + } + /** Reflectively set the {@link ScopeConfigurator} to the {@link SdkMeterProviderBuilder}. */ public static SdkMeterProviderBuilder setMeterConfigurator( SdkMeterProviderBuilder sdkMeterProviderBuilder, From 0bd9d8638705fa5f4393d574defc208f59edaf8b Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Mon, 12 May 2025 23:45:27 +0200 Subject: [PATCH 05/32] null checks --- .../internal/state/DefaultSynchronousMetricStorage.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index b34d140b34..274d360d0c 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -148,7 +148,8 @@ public final class DefaultSynchronousMetricStorage getHolderForRecord() { do { - AggregatorHolder localAggregatorHolder = this.aggregatorHolder.get(); + AggregatorHolder localAggregatorHolder = + Objects.requireNonNull(this.aggregatorHolder.get()); int recordsInProgress = localAggregatorHolder.activeRecordingThreads.addAndGet(2); if (recordsInProgress % 2 == 0) { return localAggregatorHolder; @@ -211,7 +212,7 @@ public final class DefaultSynchronousMetricStorage holder = this.aggregatorHolder.get(); + AggregatorHolder holder = Objects.requireNonNull(this.aggregatorHolder.get()); ConcurrentHashMap> aggregatorHandles; if (reset) { From 73a42da58a0576b182c44887abafb48ab44d6942 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Mon, 12 May 2025 23:45:37 +0200 Subject: [PATCH 06/32] generic drop agg --- .../sdk/metrics/internal/aggregator/Aggregator.java | 6 +++--- .../metrics/internal/aggregator/DropAggregator.java | 13 ++++++------- .../internal/exemplar/ExemplarReservoir.java | 5 +++++ .../internal/exemplar/NoopExemplarReservoir.java | 1 + 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java index db167bab5b..1e23b5213a 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java @@ -7,7 +7,6 @@ package io.opentelemetry.sdk.metrics.internal.aggregator; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; -import io.opentelemetry.sdk.metrics.data.DoubleExemplarData; import io.opentelemetry.sdk.metrics.data.ExemplarData; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.MetricDataType; @@ -27,8 +26,9 @@ import javax.annotation.concurrent.Immutable; @Immutable public interface Aggregator { /** Returns the drop aggregator, an aggregator that drops measurements. */ - static Aggregator drop() { - return DropAggregator.INSTANCE; + @SuppressWarnings("unchecked") + static Aggregator drop() { + return (Aggregator) DropAggregator.INSTANCE; } /** diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DropAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DropAggregator.java index 28336b8fb4..ed9ff899ce 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DropAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DropAggregator.java @@ -8,7 +8,6 @@ package io.opentelemetry.sdk.metrics.internal.aggregator; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; -import io.opentelemetry.sdk.metrics.data.DoubleExemplarData; import io.opentelemetry.sdk.metrics.data.ExemplarData; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.PointData; @@ -25,7 +24,7 @@ import java.util.List; *

This class is internal and is hence not for public use. Its APIs are unstable and can change * at any time. */ -public final class DropAggregator implements Aggregator { +public final class DropAggregator implements Aggregator { private static final PointData POINT_DATA = new PointData() { @@ -50,16 +49,16 @@ public final class DropAggregator implements Aggregator INSTANCE = new DropAggregator(); + public static final Aggregator INSTANCE = new DropAggregator(); - private static final AggregatorHandle HANDLE = - new AggregatorHandle(ExemplarReservoir.doubleNoSamples()) { + private static final AggregatorHandle HANDLE = + new AggregatorHandle(ExemplarReservoir.anyNoSamples()) { @Override protected PointData doAggregateThenMaybeReset( long startEpochNanos, long epochNanos, Attributes attributes, - List exemplars, + List exemplars, boolean reset) { return POINT_DATA; } @@ -74,7 +73,7 @@ public final class DropAggregator implements Aggregator createHandle() { + public AggregatorHandle createHandle() { return HANDLE; } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/ExemplarReservoir.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/ExemplarReservoir.java index c0de6e0df6..ac4a4b6b06 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/ExemplarReservoir.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/ExemplarReservoir.java @@ -55,6 +55,11 @@ public interface ExemplarReservoir { return NoopExemplarReservoir.LONG_INSTANCE; } + /** An exemplar reservoir that stores no exemplars. */ + static ExemplarReservoir anyNoSamples() { + return NoopExemplarReservoir.ANY_INSTANCE; + } + /** * A double reservoir with fixed size that stores the given number of exemplars. * diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/NoopExemplarReservoir.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/NoopExemplarReservoir.java index ec08dd7db4..e28b57a38e 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/NoopExemplarReservoir.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/NoopExemplarReservoir.java @@ -20,6 +20,7 @@ class NoopExemplarReservoir implements ExemplarReservoir new NoopExemplarReservoir<>(); static final NoopExemplarReservoir DOUBLE_INSTANCE = new NoopExemplarReservoir<>(); + static final NoopExemplarReservoir ANY_INSTANCE = new NoopExemplarReservoir<>(); private NoopExemplarReservoir() {} From 3e4ef5cc370da6f4c0d51d43f7215904f31ca26d Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Mon, 12 May 2025 23:55:16 +0200 Subject: [PATCH 07/32] setEnabled --- .../metrics/internal/state/AsynchronousMetricStorage.java | 8 ++++++-- .../internal/state/DefaultSynchronousMetricStorage.java | 8 ++++++-- .../sdk/metrics/internal/state/EmptyMetricStorage.java | 5 +++++ .../sdk/metrics/internal/state/MetricStorage.java | 2 ++ .../metrics/internal/state/WriteableMetricStorage.java | 2 ++ .../metrics/internal/state/MetricStorageRegistryTest.java | 5 +++++ 6 files changed, 26 insertions(+), 4 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index 27863c6496..26e440d31d 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -81,6 +81,7 @@ public final class AsynchronousMetricStorage aggregatorHolder; + private final Aggregator originalAggregator; // Time information relative to recording of data in aggregatorHandles, set while calling // callbacks @@ -106,6 +107,7 @@ public final class AsynchronousMetricStorage reusablePointsPool.returnObject(point); this.aggregatorHolder = new AggregatorHolder<>(aggregator); + this.originalAggregator = aggregator; this.handleBuilder = ignored -> aggregatorHolder.reusableHandlesPool.borrowObject(); if (memoryMode == REUSABLE_DATA) { @@ -143,8 +145,10 @@ public final class AsynchronousMetricStorage aggregator) { - this.aggregatorHolder = new AggregatorHolder<>(aggregator); + @Override + public void setEnabled(boolean enabled) { + this.aggregatorHolder = + new AggregatorHolder<>(enabled ? originalAggregator : Aggregator.drop()); } /** Record callback measurement from {@link ObservableLongMeasurement}. */ diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index 274d360d0c..0d0e65d65c 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -53,6 +53,7 @@ public final class DefaultSynchronousMetricStorage> aggregatorHolder; + private final Aggregator originalAggregator; private final AttributesProcessor attributesProcessor; private final MemoryMode memoryMode; @@ -82,6 +83,7 @@ public final class DefaultSynchronousMetricStorage(new AggregatorHolder<>(aggregator)); this.aggregationTemporality = registeredReader @@ -92,8 +94,10 @@ public final class DefaultSynchronousMetricStorage aggregator) { - this.aggregatorHolder.set(new AggregatorHolder<>(aggregator)); + @Override + public void setEnabled(boolean enabled) { + this.aggregatorHolder.set( + new AggregatorHolder<>(enabled ? originalAggregator : Aggregator.drop())); } // Visible for testing diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/EmptyMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/EmptyMetricStorage.java index faaa7087c7..9a0ed20787 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/EmptyMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/EmptyMetricStorage.java @@ -44,4 +44,9 @@ final class EmptyMetricStorage implements SynchronousMetricStorage { public boolean isEnabled() { return false; } + + @Override + public void setEnabled(boolean enabled) { + // do nothing + } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorage.java index b31852cda9..9df7abace7 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorage.java @@ -58,4 +58,6 @@ public interface MetricStorage { default boolean isEmpty() { return this == EmptyMetricStorage.INSTANCE; } + + void setEnabled(boolean enabled); } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/WriteableMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/WriteableMetricStorage.java index 7191a63f1e..d54358aa4c 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/WriteableMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/WriteableMetricStorage.java @@ -28,4 +28,6 @@ public interface WriteableMetricStorage { * otherwise (i.e. noop / empty metric storage is installed). */ boolean isEnabled(); + + void setEnabled(boolean enabled); } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistryTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistryTest.java index 99a26106a7..9f91d13fe2 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistryTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistryTest.java @@ -119,5 +119,10 @@ class MetricStorageRegistryTest { public boolean isEnabled() { return true; } + + @Override + public void setEnabled(boolean enabled) { + throw new UnsupportedOperationException("Not implemented"); + } } } From 9136311f389806b76b3bd320ff022ea37bc08767 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Mon, 12 May 2025 23:56:45 +0200 Subject: [PATCH 08/32] disable storages --- .../main/java/io/opentelemetry/sdk/metrics/SdkMeter.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java index 71a907f7ff..ae940d457c 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java @@ -106,6 +106,14 @@ final class SdkMeter implements Meter { void updateMeterConfig(MeterConfig meterConfig) { meterEnabled = meterConfig.isEnabled(); + + for (RegisteredReader registeredReader : readerStorageRegistries.keySet()) { + Collection storages = + Objects.requireNonNull(readerStorageRegistries.get(registeredReader)).getStorages(); + for (MetricStorage storage : storages) { + storage.setEnabled(meterEnabled); + } + } } // Visible for testing From 6de7e980552a6f7fc9638cf291358729098639c0 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Tue, 13 May 2025 00:00:41 +0200 Subject: [PATCH 09/32] ops --- .../sdk/metrics/internal/state/WriteableMetricStorage.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/WriteableMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/WriteableMetricStorage.java index d54358aa4c..7191a63f1e 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/WriteableMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/WriteableMetricStorage.java @@ -28,6 +28,4 @@ public interface WriteableMetricStorage { * otherwise (i.e. noop / empty metric storage is installed). */ boolean isEnabled(); - - void setEnabled(boolean enabled); } From b44530cc269c25da5f2f49739057f60570db3d32 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Tue, 13 May 2025 00:05:58 +0200 Subject: [PATCH 10/32] move down --- .../sdk/metrics/SdkMeterProvider.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProvider.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProvider.java index a143aa0ddb..a2a076fd9a 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProvider.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProvider.java @@ -56,15 +56,6 @@ public final class SdkMeterProvider implements MeterProvider, Closeable { private ScopeConfigurator meterConfigurator; - void setMeterConfigurator(ScopeConfigurator meterConfigurator) { - this.meterConfigurator = meterConfigurator; - this.registry - .getComponents() - .forEach( - sdkMeter -> - sdkMeter.updateMeterConfig(getMeterConfig(sdkMeter.getInstrumentationScopeInfo()))); - } - /** Returns a new {@link SdkMeterProviderBuilder} for {@link SdkMeterProvider}. */ public static SdkMeterProviderBuilder builder() { return new SdkMeterProviderBuilder(); @@ -115,6 +106,15 @@ public final class SdkMeterProvider implements MeterProvider, Closeable { return meterConfig == null ? MeterConfig.defaultConfig() : meterConfig; } + void setMeterConfigurator(ScopeConfigurator meterConfigurator) { + this.meterConfigurator = meterConfigurator; + this.registry + .getComponents() + .forEach( + sdkMeter -> + sdkMeter.updateMeterConfig(getMeterConfig(sdkMeter.getInstrumentationScopeInfo()))); + } + @Override public MeterBuilder meterBuilder(String instrumentationScopeName) { if (registeredReaders.isEmpty()) { From d9e39f120fa954ddc4f6324d813acba301259cc4 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Tue, 13 May 2025 00:22:59 +0200 Subject: [PATCH 11/32] unit tests --- .../sdk/metrics/SdkMeterProviderTest.java | 31 +++++++++++++++++++ .../sdk/metrics/SdkMeterTest.java | 13 ++++++++ 2 files changed, 44 insertions(+) diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkMeterProviderTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkMeterProviderTest.java index 479dcd6208..010ec042f3 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkMeterProviderTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkMeterProviderTest.java @@ -28,8 +28,10 @@ import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.internal.ScopeConfigurator; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.export.MetricReader; +import io.opentelemetry.sdk.metrics.internal.MeterConfig; import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil; import io.opentelemetry.sdk.metrics.internal.view.ViewRegistry; import io.opentelemetry.sdk.resources.Resource; @@ -47,6 +49,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; @@ -1030,6 +1033,34 @@ class SdkMeterProviderTest { sum -> sum.isCumulative().hasPointsSatisfying(point -> point.hasValue(1)))); } + private static ScopeConfigurator flipConfigurator(boolean enabled) { + return scopeInfo -> enabled ? MeterConfig.disabled() : MeterConfig.enabled(); + } + + @Test + void propagatesEnablementToLoggerDirectly() { + SdkMeterProvider meterProvider = + SdkMeterProvider.builder().registerMetricReader(InMemoryMetricReader.create()).build(); + SdkMeter meter = (SdkMeter) meterProvider.get("test"); + boolean isEnabled = meter.isMeterEnabled(); + + meterProvider.setMeterConfigurator(flipConfigurator(isEnabled)); + + Assertions.assertThat(meter.isMeterEnabled()).isEqualTo(!isEnabled); + } + + @Test + void propagatesEnablementToLoggerByUtil() { + SdkMeterProvider sdkMeterProvider = + SdkMeterProvider.builder().registerMetricReader(InMemoryMetricReader.create()).build(); + SdkMeter sdkMeter = (SdkMeter) sdkMeterProvider.get("test"); + boolean isEnabled = sdkMeter.isMeterEnabled(); + + SdkMeterProviderUtil.setMeterConfigurator(sdkMeterProvider, flipConfigurator(isEnabled)); + + Assertions.assertThat(sdkMeter.isMeterEnabled()).isEqualTo(!isEnabled); + } + private static void registerViewForAllTypes( SdkMeterProviderBuilder meterProviderBuilder, Aggregation aggregation) { for (InstrumentType instrumentType : InstrumentType.values()) { diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkMeterTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkMeterTest.java index 468d0121e2..e38d3180bd 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkMeterTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkMeterTest.java @@ -18,6 +18,7 @@ import io.opentelemetry.api.metrics.LongUpDownCounter; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; +import io.opentelemetry.sdk.metrics.internal.MeterConfig; import io.opentelemetry.sdk.metrics.internal.state.MetricStorageRegistry; import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import java.util.Locale; @@ -481,4 +482,16 @@ class SdkMeterTest { + "attributes={}" + "}}"); } + + @Test + void updateEnabled() { + SdkMeterProvider sdkMeterProvider = + SdkMeterProvider.builder().registerMetricReader(InMemoryMetricReader.create()).build(); + SdkMeter meter = (SdkMeter) sdkMeterProvider.get("test"); + + meter.updateMeterConfig(MeterConfig.disabled()); + assertThat(meter.isMeterEnabled()).isFalse(); + meter.updateMeterConfig(MeterConfig.enabled()); + assertThat(meter.isMeterEnabled()).isTrue(); + } } From c6c8f30ed569e3993a20afc951a9bd3104ddb495 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Tue, 13 May 2025 00:49:03 +0200 Subject: [PATCH 12/32] fix stuff and order --- .../internal/state/AsynchronousMetricStorage.java | 13 +++++++------ .../state/DefaultSynchronousMetricStorage.java | 15 ++++++++------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index 26e440d31d..769b4de7dc 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -145,12 +145,6 @@ public final class AsynchronousMetricStorage(enabled ? originalAggregator : Aggregator.drop()); - } - /** Record callback measurement from {@link ObservableLongMeasurement}. */ void record(Attributes attributes, long value) { attributes = validateAndProcessAttributes(attributes); @@ -321,12 +315,19 @@ public final class AsynchronousMetricStorage(enabled ? originalAggregator : Aggregator.drop()); + } + @Override public boolean isEmpty() { return aggregatorHolder.aggregator == Aggregator.drop(); } private static final class AggregatorHolder { + private final Aggregator aggregator; private final ObjectPool> reusableHandlesPool; private final BiConsumer> handleReleaser; diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index 0d0e65d65c..5c44a4236c 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -94,13 +94,8 @@ public final class DefaultSynchronousMetricStorage(enabled ? originalAggregator : Aggregator.drop())); - } - // Visible for testing + Queue> getAggregatorHandlePool() { return aggregatorHandlePool; } @@ -137,9 +132,15 @@ public final class DefaultSynchronousMetricStorage(enabled ? originalAggregator : Aggregator.drop())); + } + @Override public boolean isEnabled() { - return true; + return this.aggregatorHolder.get().aggregator == Aggregator.drop(); } /** From 3b6f149ae1c7de731be41ccd6fc53028d36a21a7 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Fri, 16 May 2025 09:58:06 +0000 Subject: [PATCH 13/32] fix cmpl --- .../internal/state/DefaultSynchronousMetricStorage.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index 5c44a4236c..4214f83a11 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -140,7 +140,8 @@ public final class DefaultSynchronousMetricStorage holder = this.aggregatorHolder.get(); + return holder != null && holder.aggregator != Aggregator.drop(); } /** From 285d43c567312a0ea869d455bde31a184a9cb154 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Wed, 21 May 2025 23:29:43 +0200 Subject: [PATCH 14/32] optimize setEnabled --- .../state/DefaultSynchronousMetricStorage.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index 5c44a4236c..6cfa21a762 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -134,8 +134,16 @@ public final class DefaultSynchronousMetricStorage(enabled ? originalAggregator : Aggregator.drop())); + if (enabled) { + AggregatorHolder localAggregatorHolder = aggregatorHolder.get(); + if (localAggregatorHolder.aggregator == Aggregator.drop()) { + AggregatorHolder newHolder = new AggregatorHolder<>(originalAggregator); + // If this fails, another thread called `setEnabled` and we can discard the current call + aggregatorHolder.compareAndSet(localAggregatorHolder, newHolder); + } + } else { + aggregatorHolder.set(new AggregatorHolder<>(Aggregator.drop())); + } } @Override From e656bd9699f9335e3ad69b0113ecb855131887f7 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Wed, 21 May 2025 23:35:52 +0200 Subject: [PATCH 15/32] optimize setEnabled --- .../internal/state/AsynchronousMetricStorage.java | 9 +++++++-- .../internal/state/DefaultSynchronousMetricStorage.java | 4 ++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index 769b4de7dc..b8a1450156 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -317,8 +317,13 @@ public final class AsynchronousMetricStorage(enabled ? originalAggregator : Aggregator.drop()); + if (enabled) { + if (aggregatorHolder.aggregator == Aggregator.drop()) { + aggregatorHolder = new AggregatorHolder<>(originalAggregator); + } + } else { + aggregatorHolder = new AggregatorHolder<>(Aggregator.drop()); + } } @Override diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index 6cfa21a762..18eee5ae08 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -178,8 +178,8 @@ public final class DefaultSynchronousMetricStorage localAggregatorHolder) { - localAggregatorHolder.activeRecordingThreads.addAndGet(-2); + private void releaseHolderForRecord(AggregatorHolder aggregatorHolder) { + aggregatorHolder.activeRecordingThreads.addAndGet(-2); } private AggregatorHandle getAggregatorHandle( From 21e188e5e74947869ec8d3426cedc4ff053e3264 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Wed, 21 May 2025 23:59:55 +0200 Subject: [PATCH 16/32] fix test name --- .../io/opentelemetry/sdk/metrics/SdkMeterProviderTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkMeterProviderTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkMeterProviderTest.java index 010ec042f3..571c149614 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkMeterProviderTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkMeterProviderTest.java @@ -1038,7 +1038,7 @@ class SdkMeterProviderTest { } @Test - void propagatesEnablementToLoggerDirectly() { + void propagatesEnablementToMeterDirectly() { SdkMeterProvider meterProvider = SdkMeterProvider.builder().registerMetricReader(InMemoryMetricReader.create()).build(); SdkMeter meter = (SdkMeter) meterProvider.get("test"); @@ -1050,7 +1050,7 @@ class SdkMeterProviderTest { } @Test - void propagatesEnablementToLoggerByUtil() { + void propagatesEnablementToMeterByUtil() { SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder().registerMetricReader(InMemoryMetricReader.create()).build(); SdkMeter sdkMeter = (SdkMeter) sdkMeterProvider.get("test"); From 03d159df0fadb0fd007c89cb4c16ea461e7e8e76 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Thu, 22 May 2025 00:10:07 +0200 Subject: [PATCH 17/32] nonnnull --- .../internal/state/DefaultSynchronousMetricStorage.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index 18eee5ae08..ddc5904254 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -135,7 +135,7 @@ public final class DefaultSynchronousMetricStorage localAggregatorHolder = aggregatorHolder.get(); + AggregatorHolder localAggregatorHolder = Objects.requireNonNull(aggregatorHolder.get()); if (localAggregatorHolder.aggregator == Aggregator.drop()) { AggregatorHolder newHolder = new AggregatorHolder<>(originalAggregator); // If this fails, another thread called `setEnabled` and we can discard the current call @@ -148,7 +148,7 @@ public final class DefaultSynchronousMetricStorage Date: Thu, 22 May 2025 00:31:06 +0200 Subject: [PATCH 18/32] new test --- .../sdk/metrics/MeterConfigTest.java | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/sdk/metrics/src/testIncubating/java/io/opentelemetry/sdk/metrics/MeterConfigTest.java b/sdk/metrics/src/testIncubating/java/io/opentelemetry/sdk/metrics/MeterConfigTest.java index aaaeaf5f85..1758aa221e 100644 --- a/sdk/metrics/src/testIncubating/java/io/opentelemetry/sdk/metrics/MeterConfigTest.java +++ b/sdk/metrics/src/testIncubating/java/io/opentelemetry/sdk/metrics/MeterConfigTest.java @@ -240,4 +240,68 @@ class MeterConfigTest { Arguments.of(enableStartsWithD, scopeDog, enabled()), Arguments.of(enableStartsWithD, scopeDuck, enabled())); } + + @Test + void setScopeConfigurator() { + // 1. Initially, configure all meters to be enabled except meterB + InMemoryMetricReader reader = InMemoryMetricReader.create(); + SdkMeterProvider meterProvider = + SdkMeterProvider.builder() + .addMeterConfiguratorCondition(nameEquals("meterB"), disabled()) + .registerMetricReader(reader) + .build(); + + SdkMeter meterA = (SdkMeter) meterProvider.get("meterA"); + SdkMeter meterB = (SdkMeter) meterProvider.get("meterB"); + SdkMeter meterC = (SdkMeter) meterProvider.get("meterC"); + + // verify isMeterEnabled() + assertThat(meterA.isMeterEnabled()).isTrue(); + assertThat(meterB.isMeterEnabled()).isFalse(); + assertThat(meterC.isMeterEnabled()).isTrue(); + + // verify metrics are emitted as expected + meterA.counterBuilder("meterA").build().add(1); + meterB.counterBuilder("meterB").build().add(2); + meterC.counterBuilder("meterC").build().add(3); + assertThat(reader.collectAllMetrics()) + .satisfiesExactlyInAnyOrder( + metricData -> assertThat(metricData).hasName("meterA"), + metricData -> assertThat(metricData).hasName("meterC")); + + // 2. Update config to disable all meters + meterProvider.setMeterConfigurator( + ScopeConfigurator.builder().setDefault(disabled()).build()); + + // verify isEnabled() + assertThat(meterA.isMeterEnabled()).isFalse(); + assertThat(meterB.isMeterEnabled()).isFalse(); + assertThat(meterC.isMeterEnabled()).isFalse(); + + // verify metrics are emitted as expected + meterA.counterBuilder("meterA").build().add(1); + meterB.counterBuilder("meterB").build().add(2); + meterC.counterBuilder("meterC").build().add(3); + assertThat(reader.collectAllMetrics()).isEmpty(); + + // 3. Update config to restore original + meterProvider.setMeterConfigurator( + ScopeConfigurator.builder() + .addCondition(nameEquals("meterB"), disabled()) + .build()); + + // verify isEnabled() + assertThat(meterA.isMeterEnabled()).isTrue(); + assertThat(meterB.isMeterEnabled()).isFalse(); + assertThat(meterC.isMeterEnabled()).isTrue(); + + // verify metrics are emitted as expected + meterA.counterBuilder("meterA").build().add(1); + meterB.counterBuilder("meterB").build().add(2); + meterC.counterBuilder("meterC").build().add(3); + assertThat(reader.collectAllMetrics()) + .satisfiesExactly( + metricData -> assertThat(metricData).hasName("meterA"), + metricData -> assertThat(metricData).hasName("meterC")); + } } From b00d72628f0552384754c133d2c26384f998902b Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Thu, 22 May 2025 00:48:22 +0200 Subject: [PATCH 19/32] fix isEnabled --- .../metrics/internal/state/DefaultSynchronousMetricStorage.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index ddc5904254..41494ce1ce 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -148,7 +148,7 @@ public final class DefaultSynchronousMetricStorage Date: Thu, 22 May 2025 00:48:29 +0200 Subject: [PATCH 20/32] new tests --- .../state/SynchronousMetricStorageTest.java | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java index 0117af20b5..59833be108 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java @@ -835,4 +835,62 @@ public class SynchronousMetricStorageTest { return argumentsList.stream(); } + + @Test + void enabledThenDisable_isEnabled() { + initialize(MemoryMode.REUSABLE_DATA); + + DefaultSynchronousMetricStorage storage = + new DefaultSynchronousMetricStorage<>( + deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor, CARDINALITY_LIMIT); + + storage.setEnabled(false); + + assertThat(storage.isEnabled()).isFalse(); + } + + @Test + void enabledThenDisableThenEnable_isEnabled() { + initialize(MemoryMode.REUSABLE_DATA); + + DefaultSynchronousMetricStorage storage = + new DefaultSynchronousMetricStorage<>( + deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor, CARDINALITY_LIMIT); + + storage.setEnabled(false); + storage.setEnabled(true); + + assertThat(storage.isEnabled()).isTrue(); + } + + @Test + void enabledThenDisable_recordAndCollect() { + initialize(MemoryMode.REUSABLE_DATA); + + DefaultSynchronousMetricStorage storage = + new DefaultSynchronousMetricStorage<>( + deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor, CARDINALITY_LIMIT); + + storage.setEnabled(false); + + storage.recordDouble(10d, Attributes.empty(), Context.current()); + + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10).isEmpty()).isTrue(); + } + + @Test + void enabledThenDisableThenEnable_recordAndCollect() { + initialize(MemoryMode.REUSABLE_DATA); + + DefaultSynchronousMetricStorage storage = + new DefaultSynchronousMetricStorage<>( + deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor, CARDINALITY_LIMIT); + + storage.setEnabled(false); + storage.setEnabled(true); + + storage.recordDouble(10d, Attributes.empty(), Context.current()); + + assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10).isEmpty()).isFalse(); + } } From 30d81f569ddba83ec8dc6a03e087320af8571f27 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Thu, 22 May 2025 00:53:57 +0200 Subject: [PATCH 21/32] new tests --- .../state/AsynchronousMetricStorageTest.java | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java index e33e3d7180..6e9fcc5d63 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java @@ -456,4 +456,50 @@ class AsynchronousMetricStorageTest { .anySatisfy(point -> assertThat(point).isSameAs(firstCollectionPoint)); } } + + @ParameterizedTest + @EnumSource(MemoryMode.class) + void enabledThenDisable_isEmpty(MemoryMode memoryMode) { + setup(memoryMode); + + longCounterStorage.setEnabled(false); + + assertThat(longCounterStorage.isEmpty()).isTrue(); + } + + @ParameterizedTest + @EnumSource(MemoryMode.class) + void enabledThenDisableThenEnable_isEmpty(MemoryMode memoryMode) { + setup(memoryMode); + + longCounterStorage.setEnabled(false); + longCounterStorage.setEnabled(true); + + assertThat(longCounterStorage.isEmpty()).isFalse(); + } + + @ParameterizedTest + @EnumSource(MemoryMode.class) + void enabledThenDisable_recordAndCollect(MemoryMode memoryMode) { + setup(memoryMode); + + longCounterStorage.setEnabled(false); + + longCounterStorage.record(Attributes.empty(), 10); + + assertThat(longCounterStorage.collect(resource, scope, 0, 0).isEmpty()).isTrue(); + } + + @ParameterizedTest + @EnumSource(MemoryMode.class) + void enabledThenDisableThenEnable_recordAndCollect(MemoryMode memoryMode) { + setup(memoryMode); + + longCounterStorage.setEnabled(false); + longCounterStorage.setEnabled(true); + + longCounterStorage.record(Attributes.empty(), 10); + + assertThat(longCounterStorage.collect(resource, scope, 0, 0).isEmpty()).isFalse(); + } } From 21d36d5c425d32b565e6f7753b8262e1d52d6e75 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Thu, 22 May 2025 00:55:08 +0200 Subject: [PATCH 22/32] memory mode --- .../state/SynchronousMetricStorageTest.java | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java index 59833be108..1467c853e5 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java @@ -836,9 +836,10 @@ public class SynchronousMetricStorageTest { return argumentsList.stream(); } - @Test - void enabledThenDisable_isEnabled() { - initialize(MemoryMode.REUSABLE_DATA); + @ParameterizedTest + @EnumSource(MemoryMode.class) + void enabledThenDisable_isEnabled(MemoryMode memoryMode) { + initialize(memoryMode); DefaultSynchronousMetricStorage storage = new DefaultSynchronousMetricStorage<>( @@ -849,9 +850,10 @@ public class SynchronousMetricStorageTest { assertThat(storage.isEnabled()).isFalse(); } - @Test - void enabledThenDisableThenEnable_isEnabled() { - initialize(MemoryMode.REUSABLE_DATA); + @ParameterizedTest + @EnumSource(MemoryMode.class) + void enabledThenDisableThenEnable_isEnabled(MemoryMode memoryMode) { + initialize(memoryMode); DefaultSynchronousMetricStorage storage = new DefaultSynchronousMetricStorage<>( @@ -863,9 +865,10 @@ public class SynchronousMetricStorageTest { assertThat(storage.isEnabled()).isTrue(); } - @Test - void enabledThenDisable_recordAndCollect() { - initialize(MemoryMode.REUSABLE_DATA); + @ParameterizedTest + @EnumSource(MemoryMode.class) + void enabledThenDisable_recordAndCollect(MemoryMode memoryMode) { + initialize(memoryMode); DefaultSynchronousMetricStorage storage = new DefaultSynchronousMetricStorage<>( @@ -878,9 +881,10 @@ public class SynchronousMetricStorageTest { assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10).isEmpty()).isTrue(); } - @Test - void enabledThenDisableThenEnable_recordAndCollect() { - initialize(MemoryMode.REUSABLE_DATA); + @ParameterizedTest + @EnumSource(MemoryMode.class) + void enabledThenDisableThenEnable_recordAndCollect(MemoryMode memoryMode) { + initialize(memoryMode); DefaultSynchronousMetricStorage storage = new DefaultSynchronousMetricStorage<>( From 4196b1745201d4b8b8b1175e6d4011e33a60300d Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Thu, 22 May 2025 01:34:49 +0200 Subject: [PATCH 23/32] cc --- .../state/AsynchronousMetricStorage.java | 67 +++++++++---------- 1 file changed, 31 insertions(+), 36 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index b8a1450156..0855a66ea3 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -55,6 +55,9 @@ public final class AsynchronousMetricStorage originalAggregator; + + private Aggregator aggregator; /** * This field is set to 1 less than the actual intended cardinality limit, allowing the last slot @@ -70,7 +73,9 @@ public final class AsynchronousMetricStorage reusablePointsPool; + private final ObjectPool> reusableHandlesPool; private final Function> handleBuilder; + private final BiConsumer> handleReleaser; private final BiConsumer pointReleaser; private final List reusablePointsList = new ArrayList<>(); @@ -78,11 +83,6 @@ public final class AsynchronousMetricStorage reusablePointsMap = new PooledHashMap<>(); - // deliberately not volatile because of performance concerns - // - which means its eventually consistent - private AggregatorHolder aggregatorHolder; - private final Aggregator originalAggregator; - // Time information relative to recording of data in aggregatorHandles, set while calling // callbacks private long startEpochNanos; @@ -101,15 +101,16 @@ public final class AsynchronousMetricStorage(aggregator::createReusablePoint); + this.reusableHandlesPool = new ObjectPool<>(aggregator::createHandle); + this.handleBuilder = ignored -> reusableHandlesPool.borrowObject(); + this.handleReleaser = (ignored, handle) -> reusableHandlesPool.returnObject(handle); this.pointReleaser = (ignored, point) -> reusablePointsPool.returnObject(point); - this.aggregatorHolder = new AggregatorHolder<>(aggregator); - this.originalAggregator = aggregator; - this.handleBuilder = ignored -> aggregatorHolder.reusableHandlesPool.borrowObject(); - if (memoryMode == REUSABLE_DATA) { this.lastPoints = new PooledHashMap<>(); this.aggregatorHandles = new PooledHashMap<>(); @@ -200,24 +201,20 @@ public final class AsynchronousMetricStorage localAggregatorHolder = aggregatorHolder; - Collection result = aggregationTemporality == AggregationTemporality.DELTA ? collectWithDeltaAggregationTemporality() : collectWithCumulativeAggregationTemporality(); // collectWith*AggregationTemporality() methods are responsible for resetting the handle - aggregatorHandles.forEach(localAggregatorHolder.handleReleaser); + aggregatorHandles.forEach(handleReleaser); aggregatorHandles.clear(); - return localAggregatorHolder.aggregator.toMetricData( + return aggregator.toMetricData( resource, instrumentationScopeInfo, metricDescriptor, result, aggregationTemporality); } private Collection collectWithDeltaAggregationTemporality() { - AggregatorHolder localAggregatorHolder = aggregatorHolder; - Map currentPoints; if (memoryMode == REUSABLE_DATA) { // deltaPoints computed in the previous collection can be released @@ -240,7 +237,7 @@ public final class AsynchronousMetricStorage(originalAggregator); + if (aggregator == Aggregator.drop()) { + aggregator = originalAggregator; } } else { - aggregatorHolder = new AggregatorHolder<>(Aggregator.drop()); + aggregator = Aggregator.drop(); + + if (memoryMode == REUSABLE_DATA) { + aggregatorHandles.forEach( + (attributes, handle) -> { + handle.aggregateThenMaybeReset(0, 0, Attributes.empty(), /* reset= */ true); + reusableHandlesPool.returnObject(handle); + }); + lastPoints.forEach(pointReleaser); + } + aggregatorHandles.clear(); + lastPoints.clear(); } } @Override public boolean isEmpty() { - return aggregatorHolder.aggregator == Aggregator.drop(); - } - - private static final class AggregatorHolder { - - private final Aggregator aggregator; - private final ObjectPool> reusableHandlesPool; - private final BiConsumer> handleReleaser; - - private AggregatorHolder(Aggregator aggregator) { - this.aggregator = aggregator; - this.reusableHandlesPool = new ObjectPool<>(aggregator::createHandle); - this.handleReleaser = (ignored, handle) -> reusableHandlesPool.returnObject(handle); - } + return aggregator == Aggregator.drop(); } } From 3c101543ba6fb819fabe826a3be03ca802e4f063 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Thu, 22 May 2025 01:35:01 +0200 Subject: [PATCH 24/32] aggregator reset tests --- .../state/AsynchronousMetricStorageTest.java | 21 ++++++++++++++++ .../state/SynchronousMetricStorageTest.java | 25 +++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java index 6e9fcc5d63..80e00ee6bd 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java @@ -502,4 +502,25 @@ class AsynchronousMetricStorageTest { assertThat(longCounterStorage.collect(resource, scope, 0, 0).isEmpty()).isFalse(); } + + @ParameterizedTest + @EnumSource(MemoryMode.class) + void disableDropsAggregatorState(MemoryMode memoryMode) { + setup(memoryMode); + + longCounterStorage.record(Attributes.empty(), 10); + + longCounterStorage.setEnabled(false); + longCounterStorage.setEnabled(true); + + longCounterStorage.record(Attributes.empty(), 5); + + MetricData metricData = longCounterStorage.collect(resource, scope, 0, 0); + assertThat(metricData) + .hasLongSumSatisfying( + sum -> + sum.satisfies( + sumData -> + assertThat(sumData.getPoints()).allMatch(point -> point.getValue() == 5d))); + } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java index 1467c853e5..bde4d7c3c7 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java @@ -897,4 +897,29 @@ public class SynchronousMetricStorageTest { assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10).isEmpty()).isFalse(); } + + @ParameterizedTest + @EnumSource(MemoryMode.class) + void disableDropsAggregatorState(MemoryMode memoryMode) { + initialize(memoryMode); + + DefaultSynchronousMetricStorage storage = + new DefaultSynchronousMetricStorage<>( + deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor, CARDINALITY_LIMIT); + + storage.recordDouble(10d, Attributes.empty(), Context.current()); + + storage.setEnabled(false); + storage.setEnabled(true); + + storage.recordDouble(5d, Attributes.empty(), Context.current()); + + MetricData metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10); + assertThat(metricData) + .hasDoubleSumSatisfying( + sum -> + sum.satisfies( + sumData -> + assertThat(sumData.getPoints()).allMatch(point -> point.getValue() == 5d))); + } } From 9b129189fca6fc0fb11dd3041bb18fe9171ef333 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Thu, 22 May 2025 01:47:39 +0200 Subject: [PATCH 25/32] more tests --- .../state/AsynchronousMetricStorageTest.java | 38 ++++++++++++++++++- .../state/SynchronousMetricStorageTest.java | 26 +++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java index 80e00ee6bd..06de4e851a 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java @@ -521,6 +521,42 @@ class AsynchronousMetricStorageTest { sum -> sum.satisfies( sumData -> - assertThat(sumData.getPoints()).allMatch(point -> point.getValue() == 5d))); + assertThat(sumData.getPoints()).allMatch(point -> point.getValue() == 5))); + } + + @ParameterizedTest + @EnumSource(MemoryMode.class) + void collect_DeltaResetAfterDisabled(MemoryMode memoryMode) { + setup(memoryMode); + + when(reader.getAggregationTemporality(any())).thenReturn(AggregationTemporality.DELTA); + longCounterStorage = + AsynchronousMetricStorage.create( + registeredReader, + registeredView, + InstrumentDescriptor.create( + "long-counter", + "description", + "unit", + InstrumentType.COUNTER, + InstrumentValueType.LONG, + Advice.empty())); + + longCounterStorage.setEpochInformation(0, 10); + longCounterStorage.record(Attributes.empty(), 5); + longCounterStorage.collect(resource, scope, 0, 0); + + longCounterStorage.setEnabled(false); + longCounterStorage.setEnabled(true); + + longCounterStorage.setEpochInformation(0, 30); + longCounterStorage.record(Attributes.empty(), 4); + MetricData metricData = longCounterStorage.collect(resource, scope, 0, 0); + assertThat(metricData) + .hasLongSumSatisfying( + sum -> + sum.satisfies( + sumData -> + assertThat(sumData.getPoints()).allMatch(point -> point.getValue() == 4))); } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java index bde4d7c3c7..8b94bca20b 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java @@ -922,4 +922,30 @@ public class SynchronousMetricStorageTest { sumData -> assertThat(sumData.getPoints()).allMatch(point -> point.getValue() == 5d))); } + + @ParameterizedTest + @EnumSource(MemoryMode.class) + void collect_DeltaResetAfterDisabled(MemoryMode memoryMode) { + initialize(memoryMode); + + DefaultSynchronousMetricStorage storage = + new DefaultSynchronousMetricStorage<>( + deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor, CARDINALITY_LIMIT); + + storage.recordDouble(5d, Attributes.empty(), Context.current()); + storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10); + deltaReader.setLastCollectEpochNanos(10); + + storage.setEnabled(false); + storage.setEnabled(true); + + storage.recordDouble(4d, Attributes.empty(), Context.current()); + MetricData metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 30); + assertThat(metricData) + .hasDoubleSumSatisfying( + sum -> + sum.satisfies( + sumData -> + assertThat(sumData.getPoints()).allMatch(point -> point.getValue() == 4d))); + } } From b21fa7ba1a2b7097fbc4c0ebf3925f61b869937b Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Thu, 22 May 2025 01:49:31 +0200 Subject: [PATCH 26/32] ops --- .../metrics/internal/state/DefaultSynchronousMetricStorage.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index 41494ce1ce..715d2e5ed8 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -95,7 +95,6 @@ public final class DefaultSynchronousMetricStorage> getAggregatorHandlePool() { return aggregatorHandlePool; } From e58c13c3d3c6d5b68e8f7f2e28b36beba660aa5d Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Thu, 22 May 2025 02:38:20 +0200 Subject: [PATCH 27/32] nn --- .../metrics/internal/state/AsynchronousMetricStorage.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index 0855a66ea3..e2e4113bf8 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -321,14 +321,6 @@ public final class AsynchronousMetricStorage { - handle.aggregateThenMaybeReset(0, 0, Attributes.empty(), /* reset= */ true); - reusableHandlesPool.returnObject(handle); - }); - lastPoints.forEach(pointReleaser); - } aggregatorHandles.clear(); lastPoints.clear(); } From d747d660af38b20d8da2a8567c9e38280134ef20 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Thu, 22 May 2025 02:42:08 +0200 Subject: [PATCH 28/32] clear --- .../metrics/internal/state/DefaultSynchronousMetricStorage.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index 715d2e5ed8..abb11b1ad9 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -137,6 +137,7 @@ public final class DefaultSynchronousMetricStorage localAggregatorHolder = Objects.requireNonNull(aggregatorHolder.get()); if (localAggregatorHolder.aggregator == Aggregator.drop()) { AggregatorHolder newHolder = new AggregatorHolder<>(originalAggregator); + previousCollectionAggregatorHandles.clear(); // If this fails, another thread called `setEnabled` and we can discard the current call aggregatorHolder.compareAndSet(localAggregatorHolder, newHolder); } From 6ca257d3ffbf3bb545a599180eb9660fc60812f4 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Thu, 22 May 2025 03:12:11 +0200 Subject: [PATCH 29/32] no order --- .../java/io/opentelemetry/sdk/metrics/MeterConfigTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/metrics/src/testIncubating/java/io/opentelemetry/sdk/metrics/MeterConfigTest.java b/sdk/metrics/src/testIncubating/java/io/opentelemetry/sdk/metrics/MeterConfigTest.java index 1758aa221e..f09a7f058b 100644 --- a/sdk/metrics/src/testIncubating/java/io/opentelemetry/sdk/metrics/MeterConfigTest.java +++ b/sdk/metrics/src/testIncubating/java/io/opentelemetry/sdk/metrics/MeterConfigTest.java @@ -300,7 +300,7 @@ class MeterConfigTest { meterB.counterBuilder("meterB").build().add(2); meterC.counterBuilder("meterC").build().add(3); assertThat(reader.collectAllMetrics()) - .satisfiesExactly( + .satisfiesExactlyInAnyOrder( metricData -> assertThat(metricData).hasName("meterA"), metricData -> assertThat(metricData).hasName("meterC")); } From 5f33bee12940c52c43f11569da4c67d2887cdb3c Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Thu, 22 May 2025 13:25:43 +0100 Subject: [PATCH 30/32] Update sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java --- .../metrics/internal/state/DefaultSynchronousMetricStorage.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index abb11b1ad9..56e9c01269 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -233,7 +233,7 @@ public final class DefaultSynchronousMetricStorage(holder.aggregator, previousCollectionAggregatorHandles) : new AggregatorHolder<>(holder.aggregator); - // Otherwise, swapAggregator was called and the update should be ignored + // If this fails, another thread called `setEnabled` and we can discard the current call aggregatorHolder.compareAndSet(holder, newHolder); // Increment recordsInProgress by 1, which produces an odd number acting as a signal that From e5de3449370d803868b931fbb53b5f44941b48f8 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Fri, 23 May 2025 09:09:35 +0100 Subject: [PATCH 31/32] Update sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java --- .../internal/state/DefaultSynchronousMetricStorage.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index 56e9c01269..556a4e6d7e 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -137,9 +137,10 @@ public final class DefaultSynchronousMetricStorage localAggregatorHolder = Objects.requireNonNull(aggregatorHolder.get()); if (localAggregatorHolder.aggregator == Aggregator.drop()) { AggregatorHolder newHolder = new AggregatorHolder<>(originalAggregator); - previousCollectionAggregatorHandles.clear(); // If this fails, another thread called `setEnabled` and we can discard the current call - aggregatorHolder.compareAndSet(localAggregatorHolder, newHolder); + if (aggregatorHolder.compareAndSet(localAggregatorHolder, newHolder)) { + previousCollectionAggregatorHandles.clear(); + } } } else { aggregatorHolder.set(new AggregatorHolder<>(Aggregator.drop())); From 7ab9b3a567e49aa57e59165edbbc78c9e5f98048 Mon Sep 17 00:00:00 2001 From: Francesco Andreuzzi Date: Fri, 23 May 2025 08:12:17 +0000 Subject: [PATCH 32/32] spotless --- .../metrics/internal/state/DefaultSynchronousMetricStorage.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index 556a4e6d7e..5d06f6e097 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -139,7 +139,7 @@ public final class DefaultSynchronousMetricStorage newHolder = new AggregatorHolder<>(originalAggregator); // If this fails, another thread called `setEnabled` and we can discard the current call if (aggregatorHolder.compareAndSet(localAggregatorHolder, newHolder)) { - previousCollectionAggregatorHandles.clear(); + previousCollectionAggregatorHandles.clear(); } } } else {