Add full delta support for sum aggregator (#3161)

* Add full support for delta temporality for sum aggregator.

* Respond to PR feedback.
This commit is contained in:
jack-berg 2021-04-20 11:00:30 -05:00 committed by GitHub
parent c28d17aaea
commit 620ae4caf6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 466 additions and 66 deletions

View File

@ -8,6 +8,7 @@ package io.opentelemetry.sdk.metrics;
import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory; import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType; import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.view.InstrumentSelector; import io.opentelemetry.sdk.metrics.view.InstrumentSelector;
import io.opentelemetry.sdk.metrics.view.View; import io.opentelemetry.sdk.metrics.view.View;
import java.util.EnumMap; import java.util.EnumMap;
@ -26,7 +27,9 @@ import java.util.regex.Pattern;
final class ViewRegistry { final class ViewRegistry {
private static final LinkedHashMap<Pattern, View> EMPTY_CONFIG = new LinkedHashMap<>(); private static final LinkedHashMap<Pattern, View> EMPTY_CONFIG = new LinkedHashMap<>();
static final View CUMULATIVE_SUM = static final View CUMULATIVE_SUM =
View.builder().setAggregatorFactory(AggregatorFactory.sum(true)).build(); View.builder()
.setAggregatorFactory(AggregatorFactory.sum(AggregationTemporality.CUMULATIVE))
.build();
static final View SUMMARY = static final View SUMMARY =
View.builder().setAggregatorFactory(AggregatorFactory.minMaxSumCount()).build(); View.builder().setAggregatorFactory(AggregatorFactory.minMaxSumCount()).build();
static final View LAST_VALUE = static final View LAST_VALUE =

View File

@ -14,25 +14,83 @@ import io.opentelemetry.sdk.resources.Resource;
abstract class AbstractSumAggregator<T> extends AbstractAggregator<T> { abstract class AbstractSumAggregator<T> extends AbstractAggregator<T> {
private final boolean isMonotonic; private final boolean isMonotonic;
private final AggregationTemporality temporality; private final AggregationTemporality temporality;
private final MergeStrategy mergeStrategy;
AbstractSumAggregator( AbstractSumAggregator(
Resource resource, Resource resource,
InstrumentationLibraryInfo instrumentationLibraryInfo, InstrumentationLibraryInfo instrumentationLibraryInfo,
InstrumentDescriptor instrumentDescriptor, InstrumentDescriptor instrumentDescriptor,
boolean stateful) { AggregationTemporality temporality) {
super(resource, instrumentationLibraryInfo, instrumentDescriptor, stateful); super(
this.isMonotonic = resource,
instrumentDescriptor.getType() == InstrumentType.COUNTER instrumentationLibraryInfo,
|| instrumentDescriptor.getType() == InstrumentType.SUM_OBSERVER; instrumentDescriptor,
AggregationTemporality temp = resolveStateful(instrumentDescriptor.getType(), temporality));
isStateful() ? AggregationTemporality.CUMULATIVE : AggregationTemporality.DELTA; InstrumentType type = instrumentDescriptor.getType();
if (instrumentDescriptor.getType() == InstrumentType.SUM_OBSERVER this.isMonotonic = type == InstrumentType.COUNTER || type == InstrumentType.SUM_OBSERVER;
|| instrumentDescriptor.getType() == InstrumentType.UP_DOWN_SUM_OBSERVER) { this.temporality = temporality;
temp = AggregationTemporality.CUMULATIVE; this.mergeStrategy = resolveMergeStrategy(type, temporality);
}
this.temporality = temp;
} }
/**
* Resolve whether the aggregator should be stateful. For the special case {@link
* InstrumentType#SUM_OBSERVER} and {@link InstrumentType#UP_DOWN_SUM_OBSERVER} instruments, state
* is required if temporality is {@link AggregationTemporality#DELTA}. Because the observed values
* are cumulative sums, we must maintain state to compute delta sums between collections. For
* other instruments, state is required if temporality is {@link
* AggregationTemporality#CUMULATIVE}.
*
* @param instrumentType the instrument type
* @param temporality the temporality
* @return whether the aggregator is stateful
*/
private static boolean resolveStateful(
InstrumentType instrumentType, AggregationTemporality temporality) {
if (instrumentType == InstrumentType.SUM_OBSERVER
|| instrumentType == InstrumentType.UP_DOWN_SUM_OBSERVER) {
return temporality == AggregationTemporality.DELTA;
} else {
return temporality == AggregationTemporality.CUMULATIVE;
}
}
/**
* Resolve the aggregator merge strategy. The merge strategy is SUM in all cases except where
* temporality is {@link AggregationTemporality#DELTA} and instrument type is {@link
* InstrumentType#SUM_OBSERVER} or {@link InstrumentType#UP_DOWN_SUM_OBSERVER}. In these special
* cases, the observed values are cumulative sums so we must take a diff to compute the delta sum.
*
* @param instrumentType the instrument type
* @param temporality the temporality
* @return the merge strategy
*/
// Visible for testing
static MergeStrategy resolveMergeStrategy(
InstrumentType instrumentType, AggregationTemporality temporality) {
if ((instrumentType == InstrumentType.SUM_OBSERVER
|| instrumentType == InstrumentType.UP_DOWN_SUM_OBSERVER)
&& temporality == AggregationTemporality.DELTA) {
return MergeStrategy.DIFF;
} else {
return MergeStrategy.SUM;
}
}
@Override
public final T merge(T previousAccumulation, T accumulation) {
switch (mergeStrategy) {
case SUM:
return mergeSum(previousAccumulation, accumulation);
case DIFF:
return mergeDiff(previousAccumulation, accumulation);
}
throw new IllegalStateException("Unsupported merge strategy: " + mergeStrategy.name());
}
abstract T mergeSum(T previousAccumulation, T accumulation);
abstract T mergeDiff(T previousAccumulation, T accumulation);
final boolean isMonotonic() { final boolean isMonotonic() {
return isMonotonic; return isMonotonic;
} }
@ -40,4 +98,9 @@ abstract class AbstractSumAggregator<T> extends AbstractAggregator<T> {
final AggregationTemporality temporality() { final AggregationTemporality temporality() {
return temporality; return temporality;
} }
enum MergeStrategy {
SUM,
DIFF
}
} }

View File

@ -57,9 +57,11 @@ public interface Aggregator<T> {
/** /**
* Returns the result of the merge of the given accumulations. * Returns the result of the merge of the given accumulations.
* *
* @param previousAccumulation the previously captured accumulation
* @param accumulation the newly captured accumulation
* @return the result of the merge of the given accumulations. * @return the result of the merge of the given accumulations.
*/ */
T merge(T a1, T a2); T merge(T previousAccumulation, T accumulation);
/** /**
* Returns {@code true} if the processor needs to keep the previous collected state in order to * Returns {@code true} if the processor needs to keep the previous collected state in order to

View File

@ -26,9 +26,26 @@ public interface AggregatorFactory {
* if {@code true} OR {@link AggregationTemporality#DELTA} for all types except SumObserver * if {@code true} OR {@link AggregationTemporality#DELTA} for all types except SumObserver
* and UpDownSumObserver which will always produce {@link AggregationTemporality#CUMULATIVE}. * and UpDownSumObserver which will always produce {@link AggregationTemporality#CUMULATIVE}.
* @return an {@code AggregationFactory} that calculates sum of recorded measurements. * @return an {@code AggregationFactory} that calculates sum of recorded measurements.
* @deprecated Use {@link AggregatorFactory#sum(AggregationTemporality)}
*/ */
@Deprecated
static AggregatorFactory sum(boolean alwaysCumulative) { static AggregatorFactory sum(boolean alwaysCumulative) {
return new SumAggregatorFactory(alwaysCumulative); return new SumAggregatorFactory(
alwaysCumulative ? AggregationTemporality.CUMULATIVE : AggregationTemporality.DELTA);
}
/**
* Returns an {@code AggregationFactory} that calculates sum of recorded measurements.
*
* <p>This factory produces {@link Aggregator} that will always produce Sum metrics, the
* monotonicity is determined based on the instrument type (for Counter and SumObserver will be
* monotonic, otherwise not).
*
* @param temporality configures what temporality to be produced for the Sum metrics.
* @return an {@code AggregationFactory} that calculates sum of recorded measurements.
*/
static AggregatorFactory sum(AggregationTemporality temporality) {
return new SumAggregatorFactory(temporality);
} }
/** /**

View File

@ -20,8 +20,8 @@ final class DoubleSumAggregator extends AbstractSumAggregator<Double> {
Resource resource, Resource resource,
InstrumentationLibraryInfo instrumentationLibraryInfo, InstrumentationLibraryInfo instrumentationLibraryInfo,
InstrumentDescriptor descriptor, InstrumentDescriptor descriptor,
boolean stateful) { AggregationTemporality temporality) {
super(resource, instrumentationLibraryInfo, descriptor, stateful); super(resource, instrumentationLibraryInfo, descriptor, temporality);
} }
@Override @Override
@ -35,8 +35,13 @@ final class DoubleSumAggregator extends AbstractSumAggregator<Double> {
} }
@Override @Override
public final Double merge(Double a1, Double a2) { Double mergeSum(Double previousAccumulation, Double accumulation) {
return a1 + a2; return previousAccumulation + accumulation;
}
@Override
Double mergeDiff(Double previousAccumulation, Double accumulation) {
return accumulation - previousAccumulation;
} }
@Override @Override

View File

@ -16,12 +16,13 @@ import java.util.Map;
import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.atomic.LongAdder;
final class LongSumAggregator extends AbstractSumAggregator<Long> { final class LongSumAggregator extends AbstractSumAggregator<Long> {
LongSumAggregator( LongSumAggregator(
Resource resource, Resource resource,
InstrumentationLibraryInfo instrumentationLibraryInfo, InstrumentationLibraryInfo instrumentationLibraryInfo,
InstrumentDescriptor descriptor, InstrumentDescriptor descriptor,
boolean stateful) { AggregationTemporality temporality) {
super(resource, instrumentationLibraryInfo, descriptor, stateful); super(resource, instrumentationLibraryInfo, descriptor, temporality);
} }
@Override @Override
@ -35,8 +36,13 @@ final class LongSumAggregator extends AbstractSumAggregator<Long> {
} }
@Override @Override
public Long merge(Long a1, Long a2) { Long mergeSum(Long previousAccumulation, Long accumulation) {
return a1 + a2; return previousAccumulation + accumulation;
}
@Override
Long mergeDiff(Long previousAccumulation, Long accumulation) {
return accumulation - previousAccumulation;
} }
@Override @Override

View File

@ -7,14 +7,15 @@ package io.opentelemetry.sdk.metrics.aggregator;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType; import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.resources.Resource;
final class SumAggregatorFactory implements AggregatorFactory { final class SumAggregatorFactory implements AggregatorFactory {
private final boolean alwaysCumulative;
SumAggregatorFactory(boolean alwaysCumulative) { private final AggregationTemporality temporality;
this.alwaysCumulative = alwaysCumulative;
SumAggregatorFactory(AggregationTemporality temporality) {
this.temporality = temporality;
} }
@Override @Override
@ -23,18 +24,13 @@ final class SumAggregatorFactory implements AggregatorFactory {
Resource resource, Resource resource,
InstrumentationLibraryInfo instrumentationLibraryInfo, InstrumentationLibraryInfo instrumentationLibraryInfo,
InstrumentDescriptor descriptor) { InstrumentDescriptor descriptor) {
boolean stateful = alwaysCumulative;
if (descriptor.getType() == InstrumentType.SUM_OBSERVER
|| descriptor.getType() == InstrumentType.UP_DOWN_SUM_OBSERVER) {
stateful = false;
}
switch (descriptor.getValueType()) { switch (descriptor.getValueType()) {
case LONG: case LONG:
return (Aggregator<T>) return (Aggregator<T>)
new LongSumAggregator(resource, instrumentationLibraryInfo, descriptor, stateful); new LongSumAggregator(resource, instrumentationLibraryInfo, descriptor, temporality);
case DOUBLE: case DOUBLE:
return (Aggregator<T>) return (Aggregator<T>)
new DoubleSumAggregator(resource, instrumentationLibraryInfo, descriptor, stateful); new DoubleSumAggregator(resource, instrumentationLibraryInfo, descriptor, temporality);
} }
throw new IllegalArgumentException("Invalid instrument value type"); throw new IllegalArgumentException("Invalid instrument value type");
} }

View File

@ -9,14 +9,18 @@ import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.common.Labels; import io.opentelemetry.api.metrics.common.Labels;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.internal.TestClock; import io.opentelemetry.sdk.internal.TestClock;
import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoublePointData; import io.opentelemetry.sdk.metrics.data.DoublePointData;
import io.opentelemetry.sdk.metrics.data.DoubleSumData; import io.opentelemetry.sdk.metrics.data.DoubleSumData;
import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.processor.LabelsProcessorFactory;
import io.opentelemetry.sdk.metrics.view.InstrumentSelector;
import io.opentelemetry.sdk.metrics.view.View;
import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.resources.Resource;
import java.util.Collections; import java.util.Collections;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -29,13 +33,14 @@ class DoubleSumObserverSdkTest {
private static final InstrumentationLibraryInfo INSTRUMENTATION_LIBRARY_INFO = private static final InstrumentationLibraryInfo INSTRUMENTATION_LIBRARY_INFO =
InstrumentationLibraryInfo.create(DoubleSumObserverSdkTest.class.getName(), null); InstrumentationLibraryInfo.create(DoubleSumObserverSdkTest.class.getName(), null);
private final TestClock testClock = TestClock.create(); private final TestClock testClock = TestClock.create();
private final SdkMeterProvider sdkMeterProvider = private final SdkMeterProviderBuilder sdkMeterProviderBuilder =
SdkMeterProvider.builder().setClock(testClock).setResource(RESOURCE).build(); SdkMeterProvider.builder().setClock(testClock).setResource(RESOURCE);
private final Meter sdkMeter = sdkMeterProvider.get(getClass().getName());
@Test @Test
void collectMetrics_NoCallback() { void collectMetrics_NoCallback() {
sdkMeter SdkMeterProvider sdkMeterProvider = sdkMeterProviderBuilder.build();
sdkMeterProvider
.get(getClass().getName())
.doubleSumObserverBuilder("testObserver") .doubleSumObserverBuilder("testObserver")
.setDescription("My own DoubleSumObserver") .setDescription("My own DoubleSumObserver")
.setUnit("ms") .setUnit("ms")
@ -45,7 +50,9 @@ class DoubleSumObserverSdkTest {
@Test @Test
void collectMetrics_NoRecords() { void collectMetrics_NoRecords() {
sdkMeter SdkMeterProvider sdkMeterProvider = sdkMeterProviderBuilder.build();
sdkMeterProvider
.get(getClass().getName())
.doubleSumObserverBuilder("testObserver") .doubleSumObserverBuilder("testObserver")
.setDescription("My own DoubleSumObserver") .setDescription("My own DoubleSumObserver")
.setUnit("ms") .setUnit("ms")
@ -56,7 +63,9 @@ class DoubleSumObserverSdkTest {
@Test @Test
void collectMetrics_WithOneRecord() { void collectMetrics_WithOneRecord() {
sdkMeter SdkMeterProvider sdkMeterProvider = sdkMeterProviderBuilder.build();
sdkMeterProvider
.get(getClass().getName())
.doubleSumObserverBuilder("testObserver") .doubleSumObserverBuilder("testObserver")
.setDescription("My own DoubleSumObserver") .setDescription("My own DoubleSumObserver")
.setUnit("ms") .setUnit("ms")
@ -99,4 +108,60 @@ class DoubleSumObserverSdkTest {
Labels.of("k", "v"), Labels.of("k", "v"),
12.1d))))); 12.1d)))));
} }
@Test
void collectMetrics_DeltaSumAggregator() {
SdkMeterProvider sdkMeterProvider =
sdkMeterProviderBuilder
.registerView(
InstrumentSelector.builder().setInstrumentType(InstrumentType.SUM_OBSERVER).build(),
View.builder()
.setLabelsProcessorFactory(LabelsProcessorFactory.noop())
.setAggregatorFactory(AggregatorFactory.sum(AggregationTemporality.DELTA))
.build())
.build();
sdkMeterProvider
.get(getClass().getName())
.doubleSumObserverBuilder("testObserver")
.setDescription("My own DoubleSumObserver")
.setUnit("ms")
.setUpdater(result -> result.observe(12.1d, Labels.of("k", "v")))
.build();
testClock.advanceNanos(SECOND_NANOS);
assertThat(sdkMeterProvider.collectAllMetrics())
.containsExactly(
MetricData.createDoubleSum(
RESOURCE,
INSTRUMENTATION_LIBRARY_INFO,
"testObserver",
"My own DoubleSumObserver",
"ms",
DoubleSumData.create(
/* isMonotonic= */ true,
AggregationTemporality.DELTA,
Collections.singletonList(
DoublePointData.create(
testClock.now() - SECOND_NANOS,
testClock.now(),
Labels.of("k", "v"),
12.1d)))));
testClock.advanceNanos(SECOND_NANOS);
assertThat(sdkMeterProvider.collectAllMetrics())
.containsExactly(
MetricData.createDoubleSum(
RESOURCE,
INSTRUMENTATION_LIBRARY_INFO,
"testObserver",
"My own DoubleSumObserver",
"ms",
DoubleSumData.create(
/* isMonotonic= */ true,
AggregationTemporality.DELTA,
Collections.singletonList(
DoublePointData.create(
testClock.now() - SECOND_NANOS,
testClock.now(),
Labels.of("k", "v"),
0)))));
}
} }

View File

@ -9,14 +9,18 @@ import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.common.Labels; import io.opentelemetry.api.metrics.common.Labels;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.internal.TestClock; import io.opentelemetry.sdk.internal.TestClock;
import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoublePointData; import io.opentelemetry.sdk.metrics.data.DoublePointData;
import io.opentelemetry.sdk.metrics.data.DoubleSumData; import io.opentelemetry.sdk.metrics.data.DoubleSumData;
import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.processor.LabelsProcessorFactory;
import io.opentelemetry.sdk.metrics.view.InstrumentSelector;
import io.opentelemetry.sdk.metrics.view.View;
import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.resources.Resource;
import java.util.Collections; import java.util.Collections;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -29,13 +33,14 @@ class DoubleUpDownSumObserverSdkTest {
private static final InstrumentationLibraryInfo INSTRUMENTATION_LIBRARY_INFO = private static final InstrumentationLibraryInfo INSTRUMENTATION_LIBRARY_INFO =
InstrumentationLibraryInfo.create(DoubleUpDownSumObserverSdkTest.class.getName(), null); InstrumentationLibraryInfo.create(DoubleUpDownSumObserverSdkTest.class.getName(), null);
private final TestClock testClock = TestClock.create(); private final TestClock testClock = TestClock.create();
private final SdkMeterProvider sdkMeterProvider = private final SdkMeterProviderBuilder sdkMeterProviderBuilder =
SdkMeterProvider.builder().setClock(testClock).setResource(RESOURCE).build(); SdkMeterProvider.builder().setClock(testClock).setResource(RESOURCE);
private final Meter sdkMeter = sdkMeterProvider.get(getClass().getName());
@Test @Test
void collectMetrics_NoCallback() { void collectMetrics_NoCallback() {
sdkMeter SdkMeterProvider sdkMeterProvider = sdkMeterProviderBuilder.build();
sdkMeterProvider
.get(getClass().getName())
.doubleUpDownSumObserverBuilder("testObserver") .doubleUpDownSumObserverBuilder("testObserver")
.setDescription("My own DoubleUpDownSumObserver") .setDescription("My own DoubleUpDownSumObserver")
.setUnit("ms") .setUnit("ms")
@ -45,7 +50,9 @@ class DoubleUpDownSumObserverSdkTest {
@Test @Test
void collectMetrics_NoRecords() { void collectMetrics_NoRecords() {
sdkMeter SdkMeterProvider sdkMeterProvider = sdkMeterProviderBuilder.build();
sdkMeterProvider
.get(getClass().getName())
.doubleUpDownSumObserverBuilder("testObserver") .doubleUpDownSumObserverBuilder("testObserver")
.setDescription("My own DoubleUpDownSumObserver") .setDescription("My own DoubleUpDownSumObserver")
.setUnit("ms") .setUnit("ms")
@ -56,7 +63,9 @@ class DoubleUpDownSumObserverSdkTest {
@Test @Test
void collectMetrics_WithOneRecord() { void collectMetrics_WithOneRecord() {
sdkMeter SdkMeterProvider sdkMeterProvider = sdkMeterProviderBuilder.build();
sdkMeterProvider
.get(getClass().getName())
.doubleUpDownSumObserverBuilder("testObserver") .doubleUpDownSumObserverBuilder("testObserver")
.setUpdater(result -> result.observe(12.1d, Labels.of("k", "v"))) .setUpdater(result -> result.observe(12.1d, Labels.of("k", "v")))
.build(); .build();
@ -97,4 +106,60 @@ class DoubleUpDownSumObserverSdkTest {
Labels.of("k", "v"), Labels.of("k", "v"),
12.1d))))); 12.1d)))));
} }
@Test
void collectMetrics_DeltaSumAggregator() {
SdkMeterProvider sdkMeterProvider =
sdkMeterProviderBuilder
.registerView(
InstrumentSelector.builder()
.setInstrumentType(InstrumentType.UP_DOWN_SUM_OBSERVER)
.build(),
View.builder()
.setLabelsProcessorFactory(LabelsProcessorFactory.noop())
.setAggregatorFactory(AggregatorFactory.sum(AggregationTemporality.DELTA))
.build())
.build();
sdkMeterProvider
.get(getClass().getName())
.doubleUpDownSumObserverBuilder("testObserver")
.setUpdater(result -> result.observe(12.1d, Labels.of("k", "v")))
.build();
testClock.advanceNanos(SECOND_NANOS);
assertThat(sdkMeterProvider.collectAllMetrics())
.containsExactly(
MetricData.createDoubleSum(
RESOURCE,
INSTRUMENTATION_LIBRARY_INFO,
"testObserver",
"",
"1",
DoubleSumData.create(
/* isMonotonic= */ false,
AggregationTemporality.DELTA,
Collections.singletonList(
DoublePointData.create(
testClock.now() - SECOND_NANOS,
testClock.now(),
Labels.of("k", "v"),
12.1d)))));
testClock.advanceNanos(SECOND_NANOS);
assertThat(sdkMeterProvider.collectAllMetrics())
.containsExactly(
MetricData.createDoubleSum(
RESOURCE,
INSTRUMENTATION_LIBRARY_INFO,
"testObserver",
"",
"1",
DoubleSumData.create(
/* isMonotonic= */ false,
AggregationTemporality.DELTA,
Collections.singletonList(
DoublePointData.create(
testClock.now() - SECOND_NANOS,
testClock.now(),
Labels.of("k", "v"),
0)))));
}
} }

View File

@ -9,14 +9,18 @@ import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.common.Labels; import io.opentelemetry.api.metrics.common.Labels;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.internal.TestClock; import io.opentelemetry.sdk.internal.TestClock;
import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.LongPointData; import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.LongSumData; import io.opentelemetry.sdk.metrics.data.LongSumData;
import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.processor.LabelsProcessorFactory;
import io.opentelemetry.sdk.metrics.view.InstrumentSelector;
import io.opentelemetry.sdk.metrics.view.View;
import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.resources.Resource;
import java.util.Collections; import java.util.Collections;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -29,13 +33,14 @@ class LongSumObserverSdkTest {
private static final InstrumentationLibraryInfo INSTRUMENTATION_LIBRARY_INFO = private static final InstrumentationLibraryInfo INSTRUMENTATION_LIBRARY_INFO =
InstrumentationLibraryInfo.create(LongSumObserverSdkTest.class.getName(), null); InstrumentationLibraryInfo.create(LongSumObserverSdkTest.class.getName(), null);
private final TestClock testClock = TestClock.create(); private final TestClock testClock = TestClock.create();
private final SdkMeterProvider sdkMeterProvider = private final SdkMeterProviderBuilder sdkMeterProviderBuilder =
SdkMeterProvider.builder().setClock(testClock).setResource(RESOURCE).build(); SdkMeterProvider.builder().setClock(testClock).setResource(RESOURCE);
private final Meter sdkMeter = sdkMeterProvider.get(getClass().getName());
@Test @Test
void collectMetrics_NoCallback() { void collectMetrics_NoCallback() {
sdkMeter SdkMeterProvider sdkMeterProvider = sdkMeterProviderBuilder.build();
sdkMeterProvider
.get(getClass().getName())
.longSumObserverBuilder("testObserver") .longSumObserverBuilder("testObserver")
.setDescription("My own LongSumObserver") .setDescription("My own LongSumObserver")
.setUnit("ms") .setUnit("ms")
@ -45,7 +50,9 @@ class LongSumObserverSdkTest {
@Test @Test
void collectMetrics_NoRecords() { void collectMetrics_NoRecords() {
sdkMeter SdkMeterProvider sdkMeterProvider = sdkMeterProviderBuilder.build();
sdkMeterProvider
.get(getClass().getName())
.longSumObserverBuilder("testObserver") .longSumObserverBuilder("testObserver")
.setDescription("My own LongSumObserver") .setDescription("My own LongSumObserver")
.setUnit("ms") .setUnit("ms")
@ -56,7 +63,9 @@ class LongSumObserverSdkTest {
@Test @Test
void collectMetrics_WithOneRecord() { void collectMetrics_WithOneRecord() {
sdkMeter SdkMeterProvider sdkMeterProvider = sdkMeterProviderBuilder.build();
sdkMeterProvider
.get(getClass().getName())
.longSumObserverBuilder("testObserver") .longSumObserverBuilder("testObserver")
.setUpdater(result -> result.observe(12, Labels.of("k", "v"))) .setUpdater(result -> result.observe(12, Labels.of("k", "v")))
.build(); .build();
@ -97,4 +106,58 @@ class LongSumObserverSdkTest {
Labels.of("k", "v"), Labels.of("k", "v"),
12))))); 12)))));
} }
@Test
void collectMetrics_DeltaSumAggregator() {
SdkMeterProvider sdkMeterProvider =
sdkMeterProviderBuilder
.registerView(
InstrumentSelector.builder().setInstrumentType(InstrumentType.SUM_OBSERVER).build(),
View.builder()
.setLabelsProcessorFactory(LabelsProcessorFactory.noop())
.setAggregatorFactory(AggregatorFactory.sum(AggregationTemporality.DELTA))
.build())
.build();
sdkMeterProvider
.get(getClass().getName())
.longSumObserverBuilder("testObserver")
.setUpdater(result -> result.observe(12, Labels.of("k", "v")))
.build();
testClock.advanceNanos(SECOND_NANOS);
assertThat(sdkMeterProvider.collectAllMetrics())
.containsExactly(
MetricData.createLongSum(
RESOURCE,
INSTRUMENTATION_LIBRARY_INFO,
"testObserver",
"",
"1",
LongSumData.create(
/* isMonotonic= */ true,
AggregationTemporality.DELTA,
Collections.singletonList(
LongPointData.create(
testClock.now() - SECOND_NANOS,
testClock.now(),
Labels.of("k", "v"),
12)))));
testClock.advanceNanos(SECOND_NANOS);
assertThat(sdkMeterProvider.collectAllMetrics())
.containsExactly(
MetricData.createLongSum(
RESOURCE,
INSTRUMENTATION_LIBRARY_INFO,
"testObserver",
"",
"1",
LongSumData.create(
/* isMonotonic= */ true,
AggregationTemporality.DELTA,
Collections.singletonList(
LongPointData.create(
testClock.now() - SECOND_NANOS,
testClock.now(),
Labels.of("k", "v"),
0)))));
}
} }

View File

@ -9,14 +9,18 @@ import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.common.Labels; import io.opentelemetry.api.metrics.common.Labels;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.internal.TestClock; import io.opentelemetry.sdk.internal.TestClock;
import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.LongPointData; import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.LongSumData; import io.opentelemetry.sdk.metrics.data.LongSumData;
import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.processor.LabelsProcessorFactory;
import io.opentelemetry.sdk.metrics.view.InstrumentSelector;
import io.opentelemetry.sdk.metrics.view.View;
import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.resources.Resource;
import java.util.Collections; import java.util.Collections;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -29,13 +33,14 @@ class LongUpDownSumObserverSdkTest {
private static final InstrumentationLibraryInfo INSTRUMENTATION_LIBRARY_INFO = private static final InstrumentationLibraryInfo INSTRUMENTATION_LIBRARY_INFO =
InstrumentationLibraryInfo.create(LongUpDownSumObserverSdkTest.class.getName(), null); InstrumentationLibraryInfo.create(LongUpDownSumObserverSdkTest.class.getName(), null);
private final TestClock testClock = TestClock.create(); private final TestClock testClock = TestClock.create();
private final SdkMeterProvider sdkMeterProvider = private final SdkMeterProviderBuilder sdkMeterProviderBuilder =
SdkMeterProvider.builder().setClock(testClock).setResource(RESOURCE).build(); SdkMeterProvider.builder().setClock(testClock).setResource(RESOURCE);
private final Meter sdkMeter = sdkMeterProvider.get(getClass().getName());
@Test @Test
void collectMetrics_NoCallback() { void collectMetrics_NoCallback() {
sdkMeter SdkMeterProvider sdkMeterProvider = sdkMeterProviderBuilder.build();
sdkMeterProvider
.get(getClass().getName())
.longUpDownSumObserverBuilder("testObserver") .longUpDownSumObserverBuilder("testObserver")
.setDescription("My own LongUpDownSumObserver") .setDescription("My own LongUpDownSumObserver")
.setUnit("ms") .setUnit("ms")
@ -45,7 +50,9 @@ class LongUpDownSumObserverSdkTest {
@Test @Test
void collectMetrics_NoRecords() { void collectMetrics_NoRecords() {
sdkMeter SdkMeterProvider sdkMeterProvider = sdkMeterProviderBuilder.build();
sdkMeterProvider
.get(getClass().getName())
.longUpDownSumObserverBuilder("testObserver") .longUpDownSumObserverBuilder("testObserver")
.setDescription("My own LongUpDownSumObserver") .setDescription("My own LongUpDownSumObserver")
.setUnit("ms") .setUnit("ms")
@ -56,7 +63,9 @@ class LongUpDownSumObserverSdkTest {
@Test @Test
void collectMetrics_WithOneRecord() { void collectMetrics_WithOneRecord() {
sdkMeter SdkMeterProvider sdkMeterProvider = sdkMeterProviderBuilder.build();
sdkMeterProvider
.get(getClass().getName())
.longUpDownSumObserverBuilder("testObserver") .longUpDownSumObserverBuilder("testObserver")
.setUpdater(result -> result.observe(12, Labels.of("k", "v"))) .setUpdater(result -> result.observe(12, Labels.of("k", "v")))
.build(); .build();
@ -97,4 +106,60 @@ class LongUpDownSumObserverSdkTest {
Labels.of("k", "v"), Labels.of("k", "v"),
12))))); 12)))));
} }
@Test
void collectMetrics_DeltaSumAggregator() {
SdkMeterProvider sdkMeterProvider =
sdkMeterProviderBuilder
.registerView(
InstrumentSelector.builder()
.setInstrumentType(InstrumentType.UP_DOWN_SUM_OBSERVER)
.build(),
View.builder()
.setLabelsProcessorFactory(LabelsProcessorFactory.noop())
.setAggregatorFactory(AggregatorFactory.sum(AggregationTemporality.DELTA))
.build())
.build();
sdkMeterProvider
.get(getClass().getName())
.longUpDownSumObserverBuilder("testObserver")
.setUpdater(result -> result.observe(12, Labels.of("k", "v")))
.build();
testClock.advanceNanos(SECOND_NANOS);
assertThat(sdkMeterProvider.collectAllMetrics())
.containsExactly(
MetricData.createLongSum(
RESOURCE,
INSTRUMENTATION_LIBRARY_INFO,
"testObserver",
"",
"1",
LongSumData.create(
/* isMonotonic= */ false,
AggregationTemporality.DELTA,
Collections.singletonList(
LongPointData.create(
testClock.now() - SECOND_NANOS,
testClock.now(),
Labels.of("k", "v"),
12)))));
testClock.advanceNanos(SECOND_NANOS);
assertThat(sdkMeterProvider.collectAllMetrics())
.containsExactly(
MetricData.createLongSum(
RESOURCE,
INSTRUMENTATION_LIBRARY_INFO,
"testObserver",
"",
"1",
LongSumData.create(
/* isMonotonic= */ false,
AggregationTemporality.DELTA,
Collections.singletonList(
LongPointData.create(
testClock.now() - SECOND_NANOS,
testClock.now(),
Labels.of("k", "v"),
0)))));
}
} }

View File

@ -165,7 +165,9 @@ public class SdkMeterProviderTest {
void collectAllSyncInstruments_OverwriteTemporality() { void collectAllSyncInstruments_OverwriteTemporality() {
sdkMeterProviderBuilder.registerView( sdkMeterProviderBuilder.registerView(
InstrumentSelector.builder().setInstrumentType(InstrumentType.COUNTER).build(), InstrumentSelector.builder().setInstrumentType(InstrumentType.COUNTER).build(),
View.builder().setAggregatorFactory(AggregatorFactory.sum(false)).build()); View.builder()
.setAggregatorFactory(AggregatorFactory.sum(AggregationTemporality.DELTA))
.build());
SdkMeterProvider sdkMeterProvider = sdkMeterProviderBuilder.build(); SdkMeterProvider sdkMeterProvider = sdkMeterProviderBuilder.build();
Meter sdkMeter = sdkMeterProvider.get(SdkMeterProviderTest.class.getName()); Meter sdkMeter = sdkMeterProvider.get(SdkMeterProviderTest.class.getName());

View File

@ -102,7 +102,7 @@ class AggregatorFactoryTest {
@Test @Test
void getSumAggregatorFactory() { void getSumAggregatorFactory() {
AggregatorFactory sum = AggregatorFactory.sum(false); AggregatorFactory sum = AggregatorFactory.sum(AggregationTemporality.DELTA);
assertThat( assertThat(
sum.create( sum.create(
Resource.getDefault(), Resource.getDefault(),

View File

@ -9,6 +9,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.metrics.common.Labels; import io.opentelemetry.api.metrics.common.Labels;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.aggregator.AbstractSumAggregator.MergeStrategy;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType; import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType; import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
@ -28,7 +29,7 @@ class DoubleSumAggregatorTest {
InstrumentationLibraryInfo.empty(), InstrumentationLibraryInfo.empty(),
InstrumentDescriptor.create( InstrumentDescriptor.create(
"name", "description", "unit", InstrumentType.COUNTER, InstrumentValueType.DOUBLE), "name", "description", "unit", InstrumentType.COUNTER, InstrumentValueType.DOUBLE),
/* stateful= */ true); AggregationTemporality.CUMULATIVE);
@Test @Test
void createHandle() { void createHandle() {
@ -74,6 +75,29 @@ class DoubleSumAggregatorTest {
assertThat(aggregatorHandle.accumulateThenReset()).isNull(); assertThat(aggregatorHandle.accumulateThenReset()).isNull();
} }
@Test
void merge() {
for (InstrumentType instrumentType : InstrumentType.values()) {
for (AggregationTemporality temporality : AggregationTemporality.values()) {
DoubleSumAggregator aggregator =
new DoubleSumAggregator(
Resource.getDefault(),
InstrumentationLibraryInfo.empty(),
InstrumentDescriptor.create(
"name", "description", "unit", instrumentType, InstrumentValueType.LONG),
temporality);
MergeStrategy expectedMergeStrategy =
AbstractSumAggregator.resolveMergeStrategy(instrumentType, temporality);
double merged = aggregator.merge(1.0d, 2.0d);
assertThat(merged)
.withFailMessage(
"Invalid merge result for instrumentType %s, temporality %s: %s",
instrumentType, temporality, merged)
.isEqualTo(expectedMergeStrategy == MergeStrategy.SUM ? 3.0d : 1.0d);
}
}
}
@Test @Test
void toMetricData() { void toMetricData() {
AggregatorHandle<Double> aggregatorHandle = aggregator.createHandle(); AggregatorHandle<Double> aggregatorHandle = aggregator.createHandle();

View File

@ -9,6 +9,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.metrics.common.Labels; import io.opentelemetry.api.metrics.common.Labels;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.aggregator.AbstractSumAggregator.MergeStrategy;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType; import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType; import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
@ -28,7 +29,7 @@ class LongSumAggregatorTest {
InstrumentationLibraryInfo.empty(), InstrumentationLibraryInfo.empty(),
InstrumentDescriptor.create( InstrumentDescriptor.create(
"name", "description", "unit", InstrumentType.COUNTER, InstrumentValueType.LONG), "name", "description", "unit", InstrumentType.COUNTER, InstrumentValueType.LONG),
/* stateful= */ true); AggregationTemporality.CUMULATIVE);
@Test @Test
void createHandle() { void createHandle() {
@ -76,6 +77,29 @@ class LongSumAggregatorTest {
assertThat(aggregatorHandle.accumulateThenReset()).isNull(); assertThat(aggregatorHandle.accumulateThenReset()).isNull();
} }
@Test
void merge() {
for (InstrumentType instrumentType : InstrumentType.values()) {
for (AggregationTemporality temporality : AggregationTemporality.values()) {
LongSumAggregator aggregator =
new LongSumAggregator(
Resource.getDefault(),
InstrumentationLibraryInfo.empty(),
InstrumentDescriptor.create(
"name", "description", "unit", instrumentType, InstrumentValueType.LONG),
temporality);
MergeStrategy expectedMergeStrategy =
AbstractSumAggregator.resolveMergeStrategy(instrumentType, temporality);
long merged = aggregator.merge(1L, 2L);
assertThat(merged)
.withFailMessage(
"Invalid merge result for instrumentType %s, temporality %s: %s",
instrumentType, temporality, merged)
.isEqualTo(expectedMergeStrategy == MergeStrategy.SUM ? 3 : 1);
}
}
}
@Test @Test
void toMetricData() { void toMetricData() {
AggregatorHandle<Long> aggregatorHandle = aggregator.createHandle(); AggregatorHandle<Long> aggregatorHandle = aggregator.createHandle();