From 620ae4caf6d3de65353e02c1ad41df6f63686d48 Mon Sep 17 00:00:00 2001 From: jack-berg <34418638+jack-berg@users.noreply.github.com> Date: Tue, 20 Apr 2021 11:00:30 -0500 Subject: [PATCH] Add full delta support for sum aggregator (#3161) * Add full support for delta temporality for sum aggregator. * Respond to PR feedback. --- .../sdk/metrics/ViewRegistry.java | 5 +- .../aggregator/AbstractSumAggregator.java | 87 ++++++++++++++++--- .../sdk/metrics/aggregator/Aggregator.java | 4 +- .../metrics/aggregator/AggregatorFactory.java | 19 +++- .../aggregator/DoubleSumAggregator.java | 13 ++- .../metrics/aggregator/LongSumAggregator.java | 14 ++- .../aggregator/SumAggregatorFactory.java | 18 ++-- .../sdk/metrics/DoubleSumObserverSdkTest.java | 79 +++++++++++++++-- .../DoubleUpDownSumObserverSdkTest.java | 79 +++++++++++++++-- .../sdk/metrics/LongSumObserverSdkTest.java | 77 ++++++++++++++-- .../metrics/LongUpDownSumObserverSdkTest.java | 79 +++++++++++++++-- .../sdk/metrics/SdkMeterProviderTest.java | 4 +- .../aggregator/AggregatorFactoryTest.java | 2 +- .../aggregator/DoubleSumAggregatorTest.java | 26 +++++- .../aggregator/LongSumAggregatorTest.java | 26 +++++- 15 files changed, 466 insertions(+), 66 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/ViewRegistry.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/ViewRegistry.java index 78a0c3e88c..2bd49f1186 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/ViewRegistry.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/ViewRegistry.java @@ -8,6 +8,7 @@ package io.opentelemetry.sdk.metrics; import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory; import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.common.InstrumentType; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.view.InstrumentSelector; import io.opentelemetry.sdk.metrics.view.View; import java.util.EnumMap; @@ -26,7 +27,9 @@ import java.util.regex.Pattern; final class ViewRegistry { private static final LinkedHashMap EMPTY_CONFIG = new LinkedHashMap<>(); static final View CUMULATIVE_SUM = - View.builder().setAggregatorFactory(AggregatorFactory.sum(true)).build(); + View.builder() + .setAggregatorFactory(AggregatorFactory.sum(AggregationTemporality.CUMULATIVE)) + .build(); static final View SUMMARY = View.builder().setAggregatorFactory(AggregatorFactory.minMaxSumCount()).build(); static final View LAST_VALUE = diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/AbstractSumAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/AbstractSumAggregator.java index e177b81035..92dbe4b388 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/AbstractSumAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/AbstractSumAggregator.java @@ -14,25 +14,83 @@ import io.opentelemetry.sdk.resources.Resource; abstract class AbstractSumAggregator extends AbstractAggregator { private final boolean isMonotonic; private final AggregationTemporality temporality; + private final MergeStrategy mergeStrategy; AbstractSumAggregator( Resource resource, InstrumentationLibraryInfo instrumentationLibraryInfo, InstrumentDescriptor instrumentDescriptor, - boolean stateful) { - super(resource, instrumentationLibraryInfo, instrumentDescriptor, stateful); - this.isMonotonic = - instrumentDescriptor.getType() == InstrumentType.COUNTER - || instrumentDescriptor.getType() == InstrumentType.SUM_OBSERVER; - AggregationTemporality temp = - isStateful() ? AggregationTemporality.CUMULATIVE : AggregationTemporality.DELTA; - if (instrumentDescriptor.getType() == InstrumentType.SUM_OBSERVER - || instrumentDescriptor.getType() == InstrumentType.UP_DOWN_SUM_OBSERVER) { - temp = AggregationTemporality.CUMULATIVE; - } - this.temporality = temp; + AggregationTemporality temporality) { + super( + resource, + instrumentationLibraryInfo, + instrumentDescriptor, + resolveStateful(instrumentDescriptor.getType(), temporality)); + InstrumentType type = instrumentDescriptor.getType(); + this.isMonotonic = type == InstrumentType.COUNTER || type == InstrumentType.SUM_OBSERVER; + this.temporality = temporality; + this.mergeStrategy = resolveMergeStrategy(type, temporality); } + /** + * Resolve whether the aggregator should be stateful. For the special case {@link + * InstrumentType#SUM_OBSERVER} and {@link InstrumentType#UP_DOWN_SUM_OBSERVER} instruments, state + * is required if temporality is {@link AggregationTemporality#DELTA}. Because the observed values + * are cumulative sums, we must maintain state to compute delta sums between collections. For + * other instruments, state is required if temporality is {@link + * AggregationTemporality#CUMULATIVE}. + * + * @param instrumentType the instrument type + * @param temporality the temporality + * @return whether the aggregator is stateful + */ + private static boolean resolveStateful( + InstrumentType instrumentType, AggregationTemporality temporality) { + if (instrumentType == InstrumentType.SUM_OBSERVER + || instrumentType == InstrumentType.UP_DOWN_SUM_OBSERVER) { + return temporality == AggregationTemporality.DELTA; + } else { + return temporality == AggregationTemporality.CUMULATIVE; + } + } + + /** + * Resolve the aggregator merge strategy. The merge strategy is SUM in all cases except where + * temporality is {@link AggregationTemporality#DELTA} and instrument type is {@link + * InstrumentType#SUM_OBSERVER} or {@link InstrumentType#UP_DOWN_SUM_OBSERVER}. In these special + * cases, the observed values are cumulative sums so we must take a diff to compute the delta sum. + * + * @param instrumentType the instrument type + * @param temporality the temporality + * @return the merge strategy + */ + // Visible for testing + static MergeStrategy resolveMergeStrategy( + InstrumentType instrumentType, AggregationTemporality temporality) { + if ((instrumentType == InstrumentType.SUM_OBSERVER + || instrumentType == InstrumentType.UP_DOWN_SUM_OBSERVER) + && temporality == AggregationTemporality.DELTA) { + return MergeStrategy.DIFF; + } else { + return MergeStrategy.SUM; + } + } + + @Override + public final T merge(T previousAccumulation, T accumulation) { + switch (mergeStrategy) { + case SUM: + return mergeSum(previousAccumulation, accumulation); + case DIFF: + return mergeDiff(previousAccumulation, accumulation); + } + throw new IllegalStateException("Unsupported merge strategy: " + mergeStrategy.name()); + } + + abstract T mergeSum(T previousAccumulation, T accumulation); + + abstract T mergeDiff(T previousAccumulation, T accumulation); + final boolean isMonotonic() { return isMonotonic; } @@ -40,4 +98,9 @@ abstract class AbstractSumAggregator extends AbstractAggregator { final AggregationTemporality temporality() { return temporality; } + + enum MergeStrategy { + SUM, + DIFF + } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/Aggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/Aggregator.java index afeb1eaa26..a3e0340c3e 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/Aggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/Aggregator.java @@ -57,9 +57,11 @@ public interface Aggregator { /** * Returns the result of the merge of the given accumulations. * + * @param previousAccumulation the previously captured accumulation + * @param accumulation the newly captured accumulation * @return the result of the merge of the given accumulations. */ - T merge(T a1, T a2); + T merge(T previousAccumulation, T accumulation); /** * Returns {@code true} if the processor needs to keep the previous collected state in order to diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactory.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactory.java index 98dae8d32a..6714cd4378 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactory.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactory.java @@ -26,9 +26,26 @@ public interface AggregatorFactory { * if {@code true} OR {@link AggregationTemporality#DELTA} for all types except SumObserver * and UpDownSumObserver which will always produce {@link AggregationTemporality#CUMULATIVE}. * @return an {@code AggregationFactory} that calculates sum of recorded measurements. + * @deprecated Use {@link AggregatorFactory#sum(AggregationTemporality)} */ + @Deprecated static AggregatorFactory sum(boolean alwaysCumulative) { - return new SumAggregatorFactory(alwaysCumulative); + return new SumAggregatorFactory( + alwaysCumulative ? AggregationTemporality.CUMULATIVE : AggregationTemporality.DELTA); + } + + /** + * Returns an {@code AggregationFactory} that calculates sum of recorded measurements. + * + *

This factory produces {@link Aggregator} that will always produce Sum metrics, the + * monotonicity is determined based on the instrument type (for Counter and SumObserver will be + * monotonic, otherwise not). + * + * @param temporality configures what temporality to be produced for the Sum metrics. + * @return an {@code AggregationFactory} that calculates sum of recorded measurements. + */ + static AggregatorFactory sum(AggregationTemporality temporality) { + return new SumAggregatorFactory(temporality); } /** diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/DoubleSumAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/DoubleSumAggregator.java index e6405317e8..0bd1522064 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/DoubleSumAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/DoubleSumAggregator.java @@ -20,8 +20,8 @@ final class DoubleSumAggregator extends AbstractSumAggregator { Resource resource, InstrumentationLibraryInfo instrumentationLibraryInfo, InstrumentDescriptor descriptor, - boolean stateful) { - super(resource, instrumentationLibraryInfo, descriptor, stateful); + AggregationTemporality temporality) { + super(resource, instrumentationLibraryInfo, descriptor, temporality); } @Override @@ -35,8 +35,13 @@ final class DoubleSumAggregator extends AbstractSumAggregator { } @Override - public final Double merge(Double a1, Double a2) { - return a1 + a2; + Double mergeSum(Double previousAccumulation, Double accumulation) { + return previousAccumulation + accumulation; + } + + @Override + Double mergeDiff(Double previousAccumulation, Double accumulation) { + return accumulation - previousAccumulation; } @Override diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/LongSumAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/LongSumAggregator.java index 08a8cfa615..52a1667dcf 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/LongSumAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/LongSumAggregator.java @@ -16,12 +16,13 @@ import java.util.Map; import java.util.concurrent.atomic.LongAdder; final class LongSumAggregator extends AbstractSumAggregator { + LongSumAggregator( Resource resource, InstrumentationLibraryInfo instrumentationLibraryInfo, InstrumentDescriptor descriptor, - boolean stateful) { - super(resource, instrumentationLibraryInfo, descriptor, stateful); + AggregationTemporality temporality) { + super(resource, instrumentationLibraryInfo, descriptor, temporality); } @Override @@ -35,8 +36,13 @@ final class LongSumAggregator extends AbstractSumAggregator { } @Override - public Long merge(Long a1, Long a2) { - return a1 + a2; + Long mergeSum(Long previousAccumulation, Long accumulation) { + return previousAccumulation + accumulation; + } + + @Override + Long mergeDiff(Long previousAccumulation, Long accumulation) { + return accumulation - previousAccumulation; } @Override diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/SumAggregatorFactory.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/SumAggregatorFactory.java index 11767afd67..ce8a25efd3 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/SumAggregatorFactory.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/aggregator/SumAggregatorFactory.java @@ -7,14 +7,15 @@ package io.opentelemetry.sdk.metrics.aggregator; import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor; -import io.opentelemetry.sdk.metrics.common.InstrumentType; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.resources.Resource; final class SumAggregatorFactory implements AggregatorFactory { - private final boolean alwaysCumulative; - SumAggregatorFactory(boolean alwaysCumulative) { - this.alwaysCumulative = alwaysCumulative; + private final AggregationTemporality temporality; + + SumAggregatorFactory(AggregationTemporality temporality) { + this.temporality = temporality; } @Override @@ -23,18 +24,13 @@ final class SumAggregatorFactory implements AggregatorFactory { Resource resource, InstrumentationLibraryInfo instrumentationLibraryInfo, InstrumentDescriptor descriptor) { - boolean stateful = alwaysCumulative; - if (descriptor.getType() == InstrumentType.SUM_OBSERVER - || descriptor.getType() == InstrumentType.UP_DOWN_SUM_OBSERVER) { - stateful = false; - } switch (descriptor.getValueType()) { case LONG: return (Aggregator) - new LongSumAggregator(resource, instrumentationLibraryInfo, descriptor, stateful); + new LongSumAggregator(resource, instrumentationLibraryInfo, descriptor, temporality); case DOUBLE: return (Aggregator) - new DoubleSumAggregator(resource, instrumentationLibraryInfo, descriptor, stateful); + new DoubleSumAggregator(resource, instrumentationLibraryInfo, descriptor, temporality); } throw new IllegalArgumentException("Invalid instrument value type"); } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/DoubleSumObserverSdkTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/DoubleSumObserverSdkTest.java index 4d829dc835..7308d0fe9c 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/DoubleSumObserverSdkTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/DoubleSumObserverSdkTest.java @@ -9,14 +9,18 @@ import static io.opentelemetry.api.common.AttributeKey.stringKey; import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.common.Labels; import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; import io.opentelemetry.sdk.internal.TestClock; +import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory; +import io.opentelemetry.sdk.metrics.common.InstrumentType; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.DoublePointData; import io.opentelemetry.sdk.metrics.data.DoubleSumData; import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.processor.LabelsProcessorFactory; +import io.opentelemetry.sdk.metrics.view.InstrumentSelector; +import io.opentelemetry.sdk.metrics.view.View; import io.opentelemetry.sdk.resources.Resource; import java.util.Collections; import org.junit.jupiter.api.Test; @@ -29,13 +33,14 @@ class DoubleSumObserverSdkTest { private static final InstrumentationLibraryInfo INSTRUMENTATION_LIBRARY_INFO = InstrumentationLibraryInfo.create(DoubleSumObserverSdkTest.class.getName(), null); private final TestClock testClock = TestClock.create(); - private final SdkMeterProvider sdkMeterProvider = - SdkMeterProvider.builder().setClock(testClock).setResource(RESOURCE).build(); - private final Meter sdkMeter = sdkMeterProvider.get(getClass().getName()); + private final SdkMeterProviderBuilder sdkMeterProviderBuilder = + SdkMeterProvider.builder().setClock(testClock).setResource(RESOURCE); @Test void collectMetrics_NoCallback() { - sdkMeter + SdkMeterProvider sdkMeterProvider = sdkMeterProviderBuilder.build(); + sdkMeterProvider + .get(getClass().getName()) .doubleSumObserverBuilder("testObserver") .setDescription("My own DoubleSumObserver") .setUnit("ms") @@ -45,7 +50,9 @@ class DoubleSumObserverSdkTest { @Test void collectMetrics_NoRecords() { - sdkMeter + SdkMeterProvider sdkMeterProvider = sdkMeterProviderBuilder.build(); + sdkMeterProvider + .get(getClass().getName()) .doubleSumObserverBuilder("testObserver") .setDescription("My own DoubleSumObserver") .setUnit("ms") @@ -56,7 +63,9 @@ class DoubleSumObserverSdkTest { @Test void collectMetrics_WithOneRecord() { - sdkMeter + SdkMeterProvider sdkMeterProvider = sdkMeterProviderBuilder.build(); + sdkMeterProvider + .get(getClass().getName()) .doubleSumObserverBuilder("testObserver") .setDescription("My own DoubleSumObserver") .setUnit("ms") @@ -99,4 +108,60 @@ class DoubleSumObserverSdkTest { Labels.of("k", "v"), 12.1d))))); } + + @Test + void collectMetrics_DeltaSumAggregator() { + SdkMeterProvider sdkMeterProvider = + sdkMeterProviderBuilder + .registerView( + InstrumentSelector.builder().setInstrumentType(InstrumentType.SUM_OBSERVER).build(), + View.builder() + .setLabelsProcessorFactory(LabelsProcessorFactory.noop()) + .setAggregatorFactory(AggregatorFactory.sum(AggregationTemporality.DELTA)) + .build()) + .build(); + sdkMeterProvider + .get(getClass().getName()) + .doubleSumObserverBuilder("testObserver") + .setDescription("My own DoubleSumObserver") + .setUnit("ms") + .setUpdater(result -> result.observe(12.1d, Labels.of("k", "v"))) + .build(); + testClock.advanceNanos(SECOND_NANOS); + assertThat(sdkMeterProvider.collectAllMetrics()) + .containsExactly( + MetricData.createDoubleSum( + RESOURCE, + INSTRUMENTATION_LIBRARY_INFO, + "testObserver", + "My own DoubleSumObserver", + "ms", + DoubleSumData.create( + /* isMonotonic= */ true, + AggregationTemporality.DELTA, + Collections.singletonList( + DoublePointData.create( + testClock.now() - SECOND_NANOS, + testClock.now(), + Labels.of("k", "v"), + 12.1d))))); + testClock.advanceNanos(SECOND_NANOS); + assertThat(sdkMeterProvider.collectAllMetrics()) + .containsExactly( + MetricData.createDoubleSum( + RESOURCE, + INSTRUMENTATION_LIBRARY_INFO, + "testObserver", + "My own DoubleSumObserver", + "ms", + DoubleSumData.create( + /* isMonotonic= */ true, + AggregationTemporality.DELTA, + Collections.singletonList( + DoublePointData.create( + testClock.now() - SECOND_NANOS, + testClock.now(), + Labels.of("k", "v"), + 0))))); + } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/DoubleUpDownSumObserverSdkTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/DoubleUpDownSumObserverSdkTest.java index 967eb5d8c8..817413d165 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/DoubleUpDownSumObserverSdkTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/DoubleUpDownSumObserverSdkTest.java @@ -9,14 +9,18 @@ import static io.opentelemetry.api.common.AttributeKey.stringKey; import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.common.Labels; import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; import io.opentelemetry.sdk.internal.TestClock; +import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory; +import io.opentelemetry.sdk.metrics.common.InstrumentType; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.DoublePointData; import io.opentelemetry.sdk.metrics.data.DoubleSumData; import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.processor.LabelsProcessorFactory; +import io.opentelemetry.sdk.metrics.view.InstrumentSelector; +import io.opentelemetry.sdk.metrics.view.View; import io.opentelemetry.sdk.resources.Resource; import java.util.Collections; import org.junit.jupiter.api.Test; @@ -29,13 +33,14 @@ class DoubleUpDownSumObserverSdkTest { private static final InstrumentationLibraryInfo INSTRUMENTATION_LIBRARY_INFO = InstrumentationLibraryInfo.create(DoubleUpDownSumObserverSdkTest.class.getName(), null); private final TestClock testClock = TestClock.create(); - private final SdkMeterProvider sdkMeterProvider = - SdkMeterProvider.builder().setClock(testClock).setResource(RESOURCE).build(); - private final Meter sdkMeter = sdkMeterProvider.get(getClass().getName()); + private final SdkMeterProviderBuilder sdkMeterProviderBuilder = + SdkMeterProvider.builder().setClock(testClock).setResource(RESOURCE); @Test void collectMetrics_NoCallback() { - sdkMeter + SdkMeterProvider sdkMeterProvider = sdkMeterProviderBuilder.build(); + sdkMeterProvider + .get(getClass().getName()) .doubleUpDownSumObserverBuilder("testObserver") .setDescription("My own DoubleUpDownSumObserver") .setUnit("ms") @@ -45,7 +50,9 @@ class DoubleUpDownSumObserverSdkTest { @Test void collectMetrics_NoRecords() { - sdkMeter + SdkMeterProvider sdkMeterProvider = sdkMeterProviderBuilder.build(); + sdkMeterProvider + .get(getClass().getName()) .doubleUpDownSumObserverBuilder("testObserver") .setDescription("My own DoubleUpDownSumObserver") .setUnit("ms") @@ -56,7 +63,9 @@ class DoubleUpDownSumObserverSdkTest { @Test void collectMetrics_WithOneRecord() { - sdkMeter + SdkMeterProvider sdkMeterProvider = sdkMeterProviderBuilder.build(); + sdkMeterProvider + .get(getClass().getName()) .doubleUpDownSumObserverBuilder("testObserver") .setUpdater(result -> result.observe(12.1d, Labels.of("k", "v"))) .build(); @@ -97,4 +106,60 @@ class DoubleUpDownSumObserverSdkTest { Labels.of("k", "v"), 12.1d))))); } + + @Test + void collectMetrics_DeltaSumAggregator() { + SdkMeterProvider sdkMeterProvider = + sdkMeterProviderBuilder + .registerView( + InstrumentSelector.builder() + .setInstrumentType(InstrumentType.UP_DOWN_SUM_OBSERVER) + .build(), + View.builder() + .setLabelsProcessorFactory(LabelsProcessorFactory.noop()) + .setAggregatorFactory(AggregatorFactory.sum(AggregationTemporality.DELTA)) + .build()) + .build(); + sdkMeterProvider + .get(getClass().getName()) + .doubleUpDownSumObserverBuilder("testObserver") + .setUpdater(result -> result.observe(12.1d, Labels.of("k", "v"))) + .build(); + testClock.advanceNanos(SECOND_NANOS); + assertThat(sdkMeterProvider.collectAllMetrics()) + .containsExactly( + MetricData.createDoubleSum( + RESOURCE, + INSTRUMENTATION_LIBRARY_INFO, + "testObserver", + "", + "1", + DoubleSumData.create( + /* isMonotonic= */ false, + AggregationTemporality.DELTA, + Collections.singletonList( + DoublePointData.create( + testClock.now() - SECOND_NANOS, + testClock.now(), + Labels.of("k", "v"), + 12.1d))))); + testClock.advanceNanos(SECOND_NANOS); + assertThat(sdkMeterProvider.collectAllMetrics()) + .containsExactly( + MetricData.createDoubleSum( + RESOURCE, + INSTRUMENTATION_LIBRARY_INFO, + "testObserver", + "", + "1", + DoubleSumData.create( + /* isMonotonic= */ false, + AggregationTemporality.DELTA, + Collections.singletonList( + DoublePointData.create( + testClock.now() - SECOND_NANOS, + testClock.now(), + Labels.of("k", "v"), + 0))))); + } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/LongSumObserverSdkTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/LongSumObserverSdkTest.java index f2a23ca5cd..0e55051716 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/LongSumObserverSdkTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/LongSumObserverSdkTest.java @@ -9,14 +9,18 @@ import static io.opentelemetry.api.common.AttributeKey.stringKey; import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.common.Labels; import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; import io.opentelemetry.sdk.internal.TestClock; +import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory; +import io.opentelemetry.sdk.metrics.common.InstrumentType; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.LongPointData; import io.opentelemetry.sdk.metrics.data.LongSumData; import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.processor.LabelsProcessorFactory; +import io.opentelemetry.sdk.metrics.view.InstrumentSelector; +import io.opentelemetry.sdk.metrics.view.View; import io.opentelemetry.sdk.resources.Resource; import java.util.Collections; import org.junit.jupiter.api.Test; @@ -29,13 +33,14 @@ class LongSumObserverSdkTest { private static final InstrumentationLibraryInfo INSTRUMENTATION_LIBRARY_INFO = InstrumentationLibraryInfo.create(LongSumObserverSdkTest.class.getName(), null); private final TestClock testClock = TestClock.create(); - private final SdkMeterProvider sdkMeterProvider = - SdkMeterProvider.builder().setClock(testClock).setResource(RESOURCE).build(); - private final Meter sdkMeter = sdkMeterProvider.get(getClass().getName()); + private final SdkMeterProviderBuilder sdkMeterProviderBuilder = + SdkMeterProvider.builder().setClock(testClock).setResource(RESOURCE); @Test void collectMetrics_NoCallback() { - sdkMeter + SdkMeterProvider sdkMeterProvider = sdkMeterProviderBuilder.build(); + sdkMeterProvider + .get(getClass().getName()) .longSumObserverBuilder("testObserver") .setDescription("My own LongSumObserver") .setUnit("ms") @@ -45,7 +50,9 @@ class LongSumObserverSdkTest { @Test void collectMetrics_NoRecords() { - sdkMeter + SdkMeterProvider sdkMeterProvider = sdkMeterProviderBuilder.build(); + sdkMeterProvider + .get(getClass().getName()) .longSumObserverBuilder("testObserver") .setDescription("My own LongSumObserver") .setUnit("ms") @@ -56,7 +63,9 @@ class LongSumObserverSdkTest { @Test void collectMetrics_WithOneRecord() { - sdkMeter + SdkMeterProvider sdkMeterProvider = sdkMeterProviderBuilder.build(); + sdkMeterProvider + .get(getClass().getName()) .longSumObserverBuilder("testObserver") .setUpdater(result -> result.observe(12, Labels.of("k", "v"))) .build(); @@ -97,4 +106,58 @@ class LongSumObserverSdkTest { Labels.of("k", "v"), 12))))); } + + @Test + void collectMetrics_DeltaSumAggregator() { + SdkMeterProvider sdkMeterProvider = + sdkMeterProviderBuilder + .registerView( + InstrumentSelector.builder().setInstrumentType(InstrumentType.SUM_OBSERVER).build(), + View.builder() + .setLabelsProcessorFactory(LabelsProcessorFactory.noop()) + .setAggregatorFactory(AggregatorFactory.sum(AggregationTemporality.DELTA)) + .build()) + .build(); + sdkMeterProvider + .get(getClass().getName()) + .longSumObserverBuilder("testObserver") + .setUpdater(result -> result.observe(12, Labels.of("k", "v"))) + .build(); + testClock.advanceNanos(SECOND_NANOS); + assertThat(sdkMeterProvider.collectAllMetrics()) + .containsExactly( + MetricData.createLongSum( + RESOURCE, + INSTRUMENTATION_LIBRARY_INFO, + "testObserver", + "", + "1", + LongSumData.create( + /* isMonotonic= */ true, + AggregationTemporality.DELTA, + Collections.singletonList( + LongPointData.create( + testClock.now() - SECOND_NANOS, + testClock.now(), + Labels.of("k", "v"), + 12))))); + testClock.advanceNanos(SECOND_NANOS); + assertThat(sdkMeterProvider.collectAllMetrics()) + .containsExactly( + MetricData.createLongSum( + RESOURCE, + INSTRUMENTATION_LIBRARY_INFO, + "testObserver", + "", + "1", + LongSumData.create( + /* isMonotonic= */ true, + AggregationTemporality.DELTA, + Collections.singletonList( + LongPointData.create( + testClock.now() - SECOND_NANOS, + testClock.now(), + Labels.of("k", "v"), + 0))))); + } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/LongUpDownSumObserverSdkTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/LongUpDownSumObserverSdkTest.java index 2fbd62be9f..0eb4229192 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/LongUpDownSumObserverSdkTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/LongUpDownSumObserverSdkTest.java @@ -9,14 +9,18 @@ import static io.opentelemetry.api.common.AttributeKey.stringKey; import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.common.Labels; import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; import io.opentelemetry.sdk.internal.TestClock; +import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory; +import io.opentelemetry.sdk.metrics.common.InstrumentType; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.LongPointData; import io.opentelemetry.sdk.metrics.data.LongSumData; import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.processor.LabelsProcessorFactory; +import io.opentelemetry.sdk.metrics.view.InstrumentSelector; +import io.opentelemetry.sdk.metrics.view.View; import io.opentelemetry.sdk.resources.Resource; import java.util.Collections; import org.junit.jupiter.api.Test; @@ -29,13 +33,14 @@ class LongUpDownSumObserverSdkTest { private static final InstrumentationLibraryInfo INSTRUMENTATION_LIBRARY_INFO = InstrumentationLibraryInfo.create(LongUpDownSumObserverSdkTest.class.getName(), null); private final TestClock testClock = TestClock.create(); - private final SdkMeterProvider sdkMeterProvider = - SdkMeterProvider.builder().setClock(testClock).setResource(RESOURCE).build(); - private final Meter sdkMeter = sdkMeterProvider.get(getClass().getName()); + private final SdkMeterProviderBuilder sdkMeterProviderBuilder = + SdkMeterProvider.builder().setClock(testClock).setResource(RESOURCE); @Test void collectMetrics_NoCallback() { - sdkMeter + SdkMeterProvider sdkMeterProvider = sdkMeterProviderBuilder.build(); + sdkMeterProvider + .get(getClass().getName()) .longUpDownSumObserverBuilder("testObserver") .setDescription("My own LongUpDownSumObserver") .setUnit("ms") @@ -45,7 +50,9 @@ class LongUpDownSumObserverSdkTest { @Test void collectMetrics_NoRecords() { - sdkMeter + SdkMeterProvider sdkMeterProvider = sdkMeterProviderBuilder.build(); + sdkMeterProvider + .get(getClass().getName()) .longUpDownSumObserverBuilder("testObserver") .setDescription("My own LongUpDownSumObserver") .setUnit("ms") @@ -56,7 +63,9 @@ class LongUpDownSumObserverSdkTest { @Test void collectMetrics_WithOneRecord() { - sdkMeter + SdkMeterProvider sdkMeterProvider = sdkMeterProviderBuilder.build(); + sdkMeterProvider + .get(getClass().getName()) .longUpDownSumObserverBuilder("testObserver") .setUpdater(result -> result.observe(12, Labels.of("k", "v"))) .build(); @@ -97,4 +106,60 @@ class LongUpDownSumObserverSdkTest { Labels.of("k", "v"), 12))))); } + + @Test + void collectMetrics_DeltaSumAggregator() { + SdkMeterProvider sdkMeterProvider = + sdkMeterProviderBuilder + .registerView( + InstrumentSelector.builder() + .setInstrumentType(InstrumentType.UP_DOWN_SUM_OBSERVER) + .build(), + View.builder() + .setLabelsProcessorFactory(LabelsProcessorFactory.noop()) + .setAggregatorFactory(AggregatorFactory.sum(AggregationTemporality.DELTA)) + .build()) + .build(); + sdkMeterProvider + .get(getClass().getName()) + .longUpDownSumObserverBuilder("testObserver") + .setUpdater(result -> result.observe(12, Labels.of("k", "v"))) + .build(); + testClock.advanceNanos(SECOND_NANOS); + assertThat(sdkMeterProvider.collectAllMetrics()) + .containsExactly( + MetricData.createLongSum( + RESOURCE, + INSTRUMENTATION_LIBRARY_INFO, + "testObserver", + "", + "1", + LongSumData.create( + /* isMonotonic= */ false, + AggregationTemporality.DELTA, + Collections.singletonList( + LongPointData.create( + testClock.now() - SECOND_NANOS, + testClock.now(), + Labels.of("k", "v"), + 12))))); + testClock.advanceNanos(SECOND_NANOS); + assertThat(sdkMeterProvider.collectAllMetrics()) + .containsExactly( + MetricData.createLongSum( + RESOURCE, + INSTRUMENTATION_LIBRARY_INFO, + "testObserver", + "", + "1", + LongSumData.create( + /* isMonotonic= */ false, + AggregationTemporality.DELTA, + Collections.singletonList( + LongPointData.create( + testClock.now() - SECOND_NANOS, + testClock.now(), + Labels.of("k", "v"), + 0))))); + } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkMeterProviderTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkMeterProviderTest.java index 9f7428007c..e6bdabbd59 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkMeterProviderTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkMeterProviderTest.java @@ -165,7 +165,9 @@ public class SdkMeterProviderTest { void collectAllSyncInstruments_OverwriteTemporality() { sdkMeterProviderBuilder.registerView( InstrumentSelector.builder().setInstrumentType(InstrumentType.COUNTER).build(), - View.builder().setAggregatorFactory(AggregatorFactory.sum(false)).build()); + View.builder() + .setAggregatorFactory(AggregatorFactory.sum(AggregationTemporality.DELTA)) + .build()); SdkMeterProvider sdkMeterProvider = sdkMeterProviderBuilder.build(); Meter sdkMeter = sdkMeterProvider.get(SdkMeterProviderTest.class.getName()); diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactoryTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactoryTest.java index d528a553e6..df1f668bc2 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactoryTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/AggregatorFactoryTest.java @@ -102,7 +102,7 @@ class AggregatorFactoryTest { @Test void getSumAggregatorFactory() { - AggregatorFactory sum = AggregatorFactory.sum(false); + AggregatorFactory sum = AggregatorFactory.sum(AggregationTemporality.DELTA); assertThat( sum.create( Resource.getDefault(), diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/DoubleSumAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/DoubleSumAggregatorTest.java index f6305710e4..0448e36f2b 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/DoubleSumAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/DoubleSumAggregatorTest.java @@ -9,6 +9,7 @@ import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.metrics.common.Labels; import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; +import io.opentelemetry.sdk.metrics.aggregator.AbstractSumAggregator.MergeStrategy; import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.common.InstrumentType; import io.opentelemetry.sdk.metrics.common.InstrumentValueType; @@ -28,7 +29,7 @@ class DoubleSumAggregatorTest { InstrumentationLibraryInfo.empty(), InstrumentDescriptor.create( "name", "description", "unit", InstrumentType.COUNTER, InstrumentValueType.DOUBLE), - /* stateful= */ true); + AggregationTemporality.CUMULATIVE); @Test void createHandle() { @@ -74,6 +75,29 @@ class DoubleSumAggregatorTest { assertThat(aggregatorHandle.accumulateThenReset()).isNull(); } + @Test + void merge() { + for (InstrumentType instrumentType : InstrumentType.values()) { + for (AggregationTemporality temporality : AggregationTemporality.values()) { + DoubleSumAggregator aggregator = + new DoubleSumAggregator( + Resource.getDefault(), + InstrumentationLibraryInfo.empty(), + InstrumentDescriptor.create( + "name", "description", "unit", instrumentType, InstrumentValueType.LONG), + temporality); + MergeStrategy expectedMergeStrategy = + AbstractSumAggregator.resolveMergeStrategy(instrumentType, temporality); + double merged = aggregator.merge(1.0d, 2.0d); + assertThat(merged) + .withFailMessage( + "Invalid merge result for instrumentType %s, temporality %s: %s", + instrumentType, temporality, merged) + .isEqualTo(expectedMergeStrategy == MergeStrategy.SUM ? 3.0d : 1.0d); + } + } + } + @Test void toMetricData() { AggregatorHandle aggregatorHandle = aggregator.createHandle(); diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/LongSumAggregatorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/LongSumAggregatorTest.java index ec6680ba04..283c04e8be 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/LongSumAggregatorTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/aggregator/LongSumAggregatorTest.java @@ -9,6 +9,7 @@ import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.metrics.common.Labels; import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; +import io.opentelemetry.sdk.metrics.aggregator.AbstractSumAggregator.MergeStrategy; import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.common.InstrumentType; import io.opentelemetry.sdk.metrics.common.InstrumentValueType; @@ -28,7 +29,7 @@ class LongSumAggregatorTest { InstrumentationLibraryInfo.empty(), InstrumentDescriptor.create( "name", "description", "unit", InstrumentType.COUNTER, InstrumentValueType.LONG), - /* stateful= */ true); + AggregationTemporality.CUMULATIVE); @Test void createHandle() { @@ -76,6 +77,29 @@ class LongSumAggregatorTest { assertThat(aggregatorHandle.accumulateThenReset()).isNull(); } + @Test + void merge() { + for (InstrumentType instrumentType : InstrumentType.values()) { + for (AggregationTemporality temporality : AggregationTemporality.values()) { + LongSumAggregator aggregator = + new LongSumAggregator( + Resource.getDefault(), + InstrumentationLibraryInfo.empty(), + InstrumentDescriptor.create( + "name", "description", "unit", instrumentType, InstrumentValueType.LONG), + temporality); + MergeStrategy expectedMergeStrategy = + AbstractSumAggregator.resolveMergeStrategy(instrumentType, temporality); + long merged = aggregator.merge(1L, 2L); + assertThat(merged) + .withFailMessage( + "Invalid merge result for instrumentType %s, temporality %s: %s", + instrumentType, temporality, merged) + .isEqualTo(expectedMergeStrategy == MergeStrategy.SUM ? 3 : 1); + } + } + } + @Test void toMetricData() { AggregatorHandle aggregatorHandle = aggregator.createHandle();