diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/DefaultSdkMeterProvider.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/DefaultSdkMeterProvider.java index ba075e0824..62c1b29fd7 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/DefaultSdkMeterProvider.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/DefaultSdkMeterProvider.java @@ -24,7 +24,9 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -45,6 +47,12 @@ final class DefaultSdkMeterProvider implements SdkMeterProvider { private final Set collectors; private final List readers; private final AtomicBoolean isClosed = new AtomicBoolean(false); + private final AtomicLong lastCollectionTimestamp; + + // Minimum amount of time we allow between synchronous collections. + // This meant to reduce overhead when multiple exporters attempt to read metrics quickly. + // TODO: This should be configurable at the SDK level. + private static final long MINIMUM_COLLECTION_INTERVAL_NANOS = TimeUnit.MILLISECONDS.toNanos(100); DefaultSdkMeterProvider( List readerFactories, @@ -57,6 +65,8 @@ final class DefaultSdkMeterProvider implements SdkMeterProvider { this.registry = new ComponentRegistry<>( instrumentationLibraryInfo -> new SdkMeter(sharedState, instrumentationLibraryInfo)); + this.lastCollectionTimestamp = + new AtomicLong(clock.nanoTime() - MINIMUM_COLLECTION_INTERVAL_NANOS); // Here we construct our own unique handle ids for this SDK. // These are guaranteed to be unique per-reader for this SDK, and only this SDK. @@ -120,12 +130,24 @@ final class DefaultSdkMeterProvider implements SdkMeterProvider { @Override public Collection collectAllMetrics() { Collection meters = registry.getComponents(); - // TODO: This can be made more efficient by passing the list through the collection and - // appending - // rather than allocating individual lists and concatenating. + // Suppress too-frequent-collection. + long currentNanoTime = sharedState.getClock().nanoTime(); + long pastNanoTime = lastCollectionTimestamp.get(); + // It hasn't been long enough since the last collection. + boolean disableSynchronousCollection = + (currentNanoTime - pastNanoTime) < MINIMUM_COLLECTION_INTERVAL_NANOS; + // If we're not disabling metrics, write the current collection time. + // We don't care if this happens in more than one thread, suppression is optimistic, and the + // interval is small enough some jitter isn't important. + if (!disableSynchronousCollection) { + lastCollectionTimestamp.lazySet(currentNanoTime); + } + List result = new ArrayList<>(meters.size()); for (SdkMeter meter : meters) { - result.addAll(meter.collectAll(handle, collectors, sharedState.getClock().now())); + result.addAll( + meter.collectAll( + handle, collectors, sharedState.getClock().now(), disableSynchronousCollection)); } return Collections.unmodifiableCollection(result); } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java index 7bdeaa421e..e6091ac362 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java @@ -37,9 +37,16 @@ final class SdkMeter implements Meter { /** Collects all the metric recordings that changed since the previous call. */ Collection collectAll( - CollectionHandle collector, Set allCollectors, long epochNanos) { + CollectionHandle collector, + Set allCollectors, + long epochNanos, + boolean suppressSynchronousCollection) { return meterSharedState.collectAll( - collector, allCollectors, meterProviderSharedState, epochNanos); + collector, + allCollectors, + meterProviderSharedState, + epochNanos, + suppressSynchronousCollection); } @Override diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/exemplar/AbstractFixedSizeExemplarReservoir.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/exemplar/AbstractFixedSizeExemplarReservoir.java index 7e6cfc220b..5ee70221e5 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/exemplar/AbstractFixedSizeExemplarReservoir.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/exemplar/AbstractFixedSizeExemplarReservoir.java @@ -103,7 +103,7 @@ abstract class AbstractFixedSizeExemplarReservoir implements ExemplarReservoir { this.value = value; this.attributes = attributes; // Note: It may make sense in the future to attempt to pull this from an active span. - this.recordTime = clock.nanoTime(); + this.recordTime = clock.now(); updateFromContext(context); } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index 5cb6758adf..d551b34e08 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -151,7 +151,8 @@ public final class AsynchronousMetricStorage implements MetricStorage { CollectionHandle collector, Set allCollectors, long startEpochNanos, - long epochNanos) { + long epochNanos, + boolean suppressSynchronousCollection) { collectLock.lock(); try { metricUpdater.run(); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index 18ce799f87..c43d7b0c29 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -98,8 +98,10 @@ public final class DefaultSynchronousMetricStorage implements SynchronousMetr CollectionHandle collector, Set allCollectors, long startEpochNanos, - long epochNanos) { - Map result = deltaMetricStorage.collectFor(collector, allCollectors); + long epochNanos, + boolean suppressSynchronousCollection) { + Map result = + deltaMetricStorage.collectFor(collector, allCollectors, suppressSynchronousCollection); return temporalMetricStorage.buildMetricFor(collector, result, startEpochNanos, epochNanos); } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaMetricStorage.java index b360298131..1d58ab8cfb 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaMetricStorage.java @@ -71,12 +71,16 @@ class DeltaMetricStorage { * * @param collector The current reader of metrics. * @param collectors All possible readers of metrics. + * @param suppressCollection If true, don't actively pull synchronous instruments, measurements + * should be up to date. * @return The delta accumulation of metrics since the last read of a the specified reader. */ public synchronized Map collectFor( - CollectionHandle collector, Set collectors) { + CollectionHandle collector, Set collectors, boolean suppressCollection) { // First we force a collection - collectSynchronousDeltaAccumulationAndReset(); + if (!suppressCollection) { + collectSynchronousDeltaAccumulationAndReset(); + } // Now build a delta result. Map result = new HashMap<>(); for (DeltaAccumulation point : unreportedDeltas) { @@ -97,6 +101,7 @@ class DeltaMetricStorage { * related stale concurrent-map handles will occur. Any {@code null} measurements are ignored. */ private synchronized void collectSynchronousDeltaAccumulationAndReset() { + // Grab accumulated measurements. Map result = new HashMap<>(); for (Map.Entry> entry : activeCollectionStorage.entrySet()) { boolean unmappedEntry = entry.getValue().tryUnmap(); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/EmptyMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/EmptyMetricStorage.java index 722fc99137..a69ea79bab 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/EmptyMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/EmptyMetricStorage.java @@ -47,7 +47,8 @@ final class EmptyMetricStorage implements SynchronousMetricStorage { CollectionHandle collector, Set allCollectors, long startEpochNanos, - long epochNanos) { + long epochNanos, + boolean suppressSynchronousCollection) { return null; } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MeterSharedState.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MeterSharedState.java index 41775bcb49..55a14e31cc 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MeterSharedState.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MeterSharedState.java @@ -50,13 +50,18 @@ public abstract class MeterSharedState { CollectionHandle collector, Set allCollectors, MeterProviderSharedState meterProviderSharedState, - long epochNanos) { + long epochNanos, + boolean suppressSynchronousCollection) { Collection metrics = getMetricStorageRegistry().getMetrics(); List result = new ArrayList<>(metrics.size()); for (MetricStorage metric : metrics) { MetricData current = metric.collectAndReset( - collector, allCollectors, meterProviderSharedState.getStartEpochNanos(), epochNanos); + collector, + allCollectors, + meterProviderSharedState.getStartEpochNanos(), + epochNanos, + suppressSynchronousCollection); if (current != null) { result.add(current); } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorage.java index 7d5ce1d580..14a7bf8143 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorage.java @@ -31,6 +31,8 @@ public interface MetricStorage { * @param allCollectors The set of all registered readers for metrics. * @param startEpochNanos The start timestamp for this SDK. * @param epochNanos The timestamp for this collection. + * @param suppressSynchronousCollection Whether or not to suppress active (blocking) collection of + * metrics, meaning recently collected data is "fresh enough" * @return The {@link MetricData} from this collection period, or {@code null}. */ @Nullable @@ -38,5 +40,6 @@ public interface MetricStorage { CollectionHandle collector, Set allCollectors, long startEpochNanos, - long epochNanos); + long epochNanos, + boolean suppressSynchronousCollection); } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkMeterProviderTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkMeterProviderTest.java index 3b1e7df042..697f220ecf 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkMeterProviderTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkMeterProviderTest.java @@ -183,7 +183,7 @@ public class SdkMeterProviderTest { LongCounter longCounter = sdkMeter.counterBuilder("testLongCounter").build(); longCounter.add(10, Attributes.empty()); - testClock.advance(Duration.ofNanos(50)); + testClock.advance(Duration.ofSeconds(1)); assertThat(sdkMeterReader.collectAllMetrics()) .satisfiesExactly( @@ -198,13 +198,13 @@ public class SdkMeterProviderTest { .satisfiesExactly( point -> assertThat(point) - .hasStartEpochNanos(testClock.now() - 50) + .hasStartEpochNanos(testClock.now() - 1000000000) .hasEpochNanos(testClock.now()) .hasAttributes(Attributes.empty()) .hasBucketCounts(1))); longCounter.add(10, Attributes.empty()); - testClock.advance(Duration.ofNanos(50)); + testClock.advance(Duration.ofSeconds(1)); assertThat(sdkMeterReader.collectAllMetrics()) .satisfiesExactly( @@ -218,7 +218,7 @@ public class SdkMeterProviderTest { .satisfiesExactly( point -> assertThat(point) - .hasStartEpochNanos(testClock.now() - 50) + .hasStartEpochNanos(testClock.now() - 1000000000) .hasEpochNanos(testClock.now()) .hasAttributes(Attributes.empty()) .hasBucketCounts(1))); @@ -251,7 +251,7 @@ public class SdkMeterProviderTest { sdkMeter.histogramBuilder("testDoubleValueRecorder").build(); doubleValueRecorder.record(10, Attributes.empty()); - testClock.advance(Duration.ofNanos(50)); + testClock.advance(Duration.ofSeconds(1)); assertThat(sdkMeterReader.collectAllMetrics()) .allSatisfy( @@ -267,7 +267,7 @@ public class SdkMeterProviderTest { .satisfiesExactlyInAnyOrder( point -> assertThat(point) - .hasStartEpochNanos(testClock.now() - 50) + .hasStartEpochNanos(testClock.now() - 1000000000) .hasEpochNanos(testClock.now()) .hasAttributes(Attributes.empty()) .hasBucketCounts(1))) @@ -280,7 +280,7 @@ public class SdkMeterProviderTest { "testLongValueRecorder", "testDoubleValueRecorder"); - testClock.advance(Duration.ofNanos(50)); + testClock.advance(Duration.ofSeconds(1)); longCounter.add(10, Attributes.empty()); longUpDownCounter.add(10, Attributes.empty()); @@ -303,7 +303,7 @@ public class SdkMeterProviderTest { .satisfiesExactlyInAnyOrder( point -> assertThat(point) - .hasStartEpochNanos(testClock.now() - 50) + .hasStartEpochNanos(testClock.now() - 1000000000) .hasEpochNanos(testClock.now()) .hasAttributes(Attributes.empty()) .hasBucketCounts(1))) diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java index 5b5eefccfb..02677cd2b6 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java @@ -74,7 +74,7 @@ public class AsynchronousMetricStorageTest { meterProviderSharedState.getResource(), meterSharedState.getInstrumentationLibraryInfo(), value -> value.observe(1.0, Attributes.empty())) - .collectAndReset(handle, all, 0, testClock.now()); + .collectAndReset(handle, all, 0, testClock.now(), false); Mockito.verify(spyAttributesProcessor).process(Attributes.empty(), Context.current()); } @@ -91,7 +91,7 @@ public class AsynchronousMetricStorageTest { meterProviderSharedState.getResource(), meterSharedState.getInstrumentationLibraryInfo(), value -> value.observe(1, Attributes.empty())) - .collectAndReset(handle, all, 0, testClock.nanoTime()); + .collectAndReset(handle, all, 0, testClock.nanoTime(), false); Mockito.verify(spyAttributesProcessor).process(Attributes.empty(), Context.current()); } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/DeltaMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/DeltaMetricStorageTest.java index cf40929f6a..c21772b710 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/DeltaMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/DeltaMetricStorageTest.java @@ -63,25 +63,49 @@ class DeltaMetricStorageTest { BoundStorageHandle bound = storage.bind(Attributes.empty()); bound.recordDouble(1, Attributes.empty(), Context.root()); // First collector only sees first recording. - assertThat(storage.collectFor(collector1, allCollectors)) + assertThat(storage.collectFor(collector1, allCollectors, /* suppressCollection=*/ false)) .hasSize(1) .hasEntrySatisfying(Attributes.empty(), value -> assertThat(value.getValue()).isEqualTo(1)); bound.recordDouble(2, Attributes.empty(), Context.root()); // First collector only sees second recording. - assertThat(storage.collectFor(collector1, allCollectors)) + assertThat(storage.collectFor(collector1, allCollectors, /* suppressCollection=*/ false)) .hasSize(1) .hasEntrySatisfying(Attributes.empty(), value -> assertThat(value.getValue()).isEqualTo(2)); // First collector no longer sees a recording. - assertThat(storage.collectFor(collector1, allCollectors)).isEmpty(); + assertThat(storage.collectFor(collector1, allCollectors, /* suppressCollection=*/ false)) + .isEmpty(); // Second collector gets merged recordings - assertThat(storage.collectFor(collector2, allCollectors)) + assertThat(storage.collectFor(collector2, allCollectors, /* suppressCollection=*/ false)) .hasSize(1) .hasEntrySatisfying(Attributes.empty(), value -> assertThat(value.getValue()).isEqualTo(3)); // Second collector no longer sees a recording. - assertThat(storage.collectFor(collector2, allCollectors)).isEmpty(); + assertThat(storage.collectFor(collector2, allCollectors, /* suppressCollection=*/ false)) + .isEmpty(); + } + + @Test + void avoidCollectionInRapidSuccession() { + BoundStorageHandle bound = storage.bind(Attributes.empty()); + bound.recordDouble(1, Attributes.empty(), Context.root()); + // First collector only sees first recording. + assertThat(storage.collectFor(collector1, allCollectors, /* suppressCollection=*/ false)) + .hasSize(1) + .hasEntrySatisfying(Attributes.empty(), value -> assertThat(value.getValue()).isEqualTo(1)); + // Add some data immediately after read, but pretent it hasn't been long. + bound.recordDouble(2, Attributes.empty(), Context.root()); + // Collector1 doesn't see new data, because we don't recollect, but collector2 sees old delta. + assertThat(storage.collectFor(collector1, allCollectors, /* suppressCollection=*/ true)) + .isEmpty(); + assertThat(storage.collectFor(collector2, allCollectors, /* suppressCollection=*/ true)) + .hasSize(1) + .hasEntrySatisfying(Attributes.empty(), value -> assertThat(value.getValue()).isEqualTo(1)); + // After enough time passes, collector1 sees new data + assertThat(storage.collectFor(collector1, allCollectors, /* suppressCollection=*/ false)) + .hasSize(1) + .hasEntrySatisfying(Attributes.empty(), value -> assertThat(value.getValue()).isEqualTo(2)); } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistryTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistryTest.java index 6bb7fc83b0..6b95363820 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistryTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistryTest.java @@ -87,7 +87,8 @@ class MetricStorageRegistryTest { CollectionHandle collector, Set all, long startEpochNanos, - long epochNanos) { + long epochNanos, + boolean suppressSynchronousCollection) { return null; } @@ -115,7 +116,8 @@ class MetricStorageRegistryTest { CollectionHandle collector, Set allCollectors, long startEpochNanos, - long epochNanos) { + long epochNanos, + boolean suppressSynchronousCollection) { return null; } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java index bc1ff56f40..40d7bb36e1 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java @@ -75,7 +75,8 @@ public class SynchronousMetricStorageTest { new DefaultSynchronousMetricStorage<>(METRIC_DESCRIPTOR, aggregator, spyLabelsProcessor); BoundStorageHandle handle = accumulator.bind(labels); handle.recordDouble(1, labels, Context.root()); - MetricData md = accumulator.collectAndReset(collector, allCollectors, 0, testClock.now()); + MetricData md = + accumulator.collectAndReset(collector, allCollectors, 0, testClock.now(), false); assertThat(md) .hasDoubleGauge() .points() @@ -97,7 +98,7 @@ public class SynchronousMetricStorageTest { accumulator.bind(Attributes.builder().put("K", "V").build()); try { assertThat(duplicateHandle).isSameAs(handle); - accumulator.collectAndReset(collector, allCollectors, 0, testClock.now()); + accumulator.collectAndReset(collector, allCollectors, 0, testClock.now(), false); BoundStorageHandle anotherDuplicateAggregatorHandle = accumulator.bind(Attributes.builder().put("K", "V").build()); try { @@ -112,6 +113,7 @@ public class SynchronousMetricStorageTest { // If we try to collect once all bound references are gone AND no recordings have occurred, we // should not see any labels (or metric). - assertThat(accumulator.collectAndReset(collector, allCollectors, 0, testClock.now())).isNull(); + assertThat(accumulator.collectAndReset(collector, allCollectors, 0, testClock.now(), false)) + .isNull(); } }