Fix Clock usage in Metrics SDK and slight optimisation to synchronous collection (#3689)

* Add timeout between delta collection to avoid rapid collection cycles across multiple readers.

* Fix issue where exemplars were not sampling wall-clock time.

* Fix synchronous collection supression to track duration using Clock.nanoTime

* Fixes from review.

* Remove leftover TODO.
This commit is contained in:
Josh Suereth 2021-10-07 09:51:46 -04:00 committed by GitHub
parent 741743e037
commit 97bbaa6a64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 110 additions and 36 deletions

View File

@ -24,7 +24,9 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@ -45,6 +47,12 @@ final class DefaultSdkMeterProvider implements SdkMeterProvider {
private final Set<CollectionHandle> collectors;
private final List<MetricReader> readers;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final AtomicLong lastCollectionTimestamp;
// Minimum amount of time we allow between synchronous collections.
// This meant to reduce overhead when multiple exporters attempt to read metrics quickly.
// TODO: This should be configurable at the SDK level.
private static final long MINIMUM_COLLECTION_INTERVAL_NANOS = TimeUnit.MILLISECONDS.toNanos(100);
DefaultSdkMeterProvider(
List<MetricReaderFactory> readerFactories,
@ -57,6 +65,8 @@ final class DefaultSdkMeterProvider implements SdkMeterProvider {
this.registry =
new ComponentRegistry<>(
instrumentationLibraryInfo -> new SdkMeter(sharedState, instrumentationLibraryInfo));
this.lastCollectionTimestamp =
new AtomicLong(clock.nanoTime() - MINIMUM_COLLECTION_INTERVAL_NANOS);
// Here we construct our own unique handle ids for this SDK.
// These are guaranteed to be unique per-reader for this SDK, and only this SDK.
@ -120,12 +130,24 @@ final class DefaultSdkMeterProvider implements SdkMeterProvider {
@Override
public Collection<MetricData> collectAllMetrics() {
Collection<SdkMeter> meters = registry.getComponents();
// TODO: This can be made more efficient by passing the list through the collection and
// appending
// rather than allocating individual lists and concatenating.
// Suppress too-frequent-collection.
long currentNanoTime = sharedState.getClock().nanoTime();
long pastNanoTime = lastCollectionTimestamp.get();
// It hasn't been long enough since the last collection.
boolean disableSynchronousCollection =
(currentNanoTime - pastNanoTime) < MINIMUM_COLLECTION_INTERVAL_NANOS;
// If we're not disabling metrics, write the current collection time.
// We don't care if this happens in more than one thread, suppression is optimistic, and the
// interval is small enough some jitter isn't important.
if (!disableSynchronousCollection) {
lastCollectionTimestamp.lazySet(currentNanoTime);
}
List<MetricData> result = new ArrayList<>(meters.size());
for (SdkMeter meter : meters) {
result.addAll(meter.collectAll(handle, collectors, sharedState.getClock().now()));
result.addAll(
meter.collectAll(
handle, collectors, sharedState.getClock().now(), disableSynchronousCollection));
}
return Collections.unmodifiableCollection(result);
}

View File

@ -37,9 +37,16 @@ final class SdkMeter implements Meter {
/** Collects all the metric recordings that changed since the previous call. */
Collection<MetricData> collectAll(
CollectionHandle collector, Set<CollectionHandle> allCollectors, long epochNanos) {
CollectionHandle collector,
Set<CollectionHandle> allCollectors,
long epochNanos,
boolean suppressSynchronousCollection) {
return meterSharedState.collectAll(
collector, allCollectors, meterProviderSharedState, epochNanos);
collector,
allCollectors,
meterProviderSharedState,
epochNanos,
suppressSynchronousCollection);
}
@Override

View File

@ -103,7 +103,7 @@ abstract class AbstractFixedSizeExemplarReservoir implements ExemplarReservoir {
this.value = value;
this.attributes = attributes;
// Note: It may make sense in the future to attempt to pull this from an active span.
this.recordTime = clock.nanoTime();
this.recordTime = clock.now();
updateFromContext(context);
}

View File

@ -151,7 +151,8 @@ public final class AsynchronousMetricStorage<T> implements MetricStorage {
CollectionHandle collector,
Set<CollectionHandle> allCollectors,
long startEpochNanos,
long epochNanos) {
long epochNanos,
boolean suppressSynchronousCollection) {
collectLock.lock();
try {
metricUpdater.run();

View File

@ -98,8 +98,10 @@ public final class DefaultSynchronousMetricStorage<T> implements SynchronousMetr
CollectionHandle collector,
Set<CollectionHandle> allCollectors,
long startEpochNanos,
long epochNanos) {
Map<Attributes, T> result = deltaMetricStorage.collectFor(collector, allCollectors);
long epochNanos,
boolean suppressSynchronousCollection) {
Map<Attributes, T> result =
deltaMetricStorage.collectFor(collector, allCollectors, suppressSynchronousCollection);
return temporalMetricStorage.buildMetricFor(collector, result, startEpochNanos, epochNanos);
}

View File

@ -71,12 +71,16 @@ class DeltaMetricStorage<T> {
*
* @param collector The current reader of metrics.
* @param collectors All possible readers of metrics.
* @param suppressCollection If true, don't actively pull synchronous instruments, measurements
* should be up to date.
* @return The delta accumulation of metrics since the last read of a the specified reader.
*/
public synchronized Map<Attributes, T> collectFor(
CollectionHandle collector, Set<CollectionHandle> collectors) {
CollectionHandle collector, Set<CollectionHandle> collectors, boolean suppressCollection) {
// First we force a collection
if (!suppressCollection) {
collectSynchronousDeltaAccumulationAndReset();
}
// Now build a delta result.
Map<Attributes, T> result = new HashMap<>();
for (DeltaAccumulation<T> point : unreportedDeltas) {
@ -97,6 +101,7 @@ class DeltaMetricStorage<T> {
* related stale concurrent-map handles will occur. Any {@code null} measurements are ignored.
*/
private synchronized void collectSynchronousDeltaAccumulationAndReset() {
// Grab accumulated measurements.
Map<Attributes, T> result = new HashMap<>();
for (Map.Entry<Attributes, AggregatorHandle<T>> entry : activeCollectionStorage.entrySet()) {
boolean unmappedEntry = entry.getValue().tryUnmap();

View File

@ -47,7 +47,8 @@ final class EmptyMetricStorage implements SynchronousMetricStorage {
CollectionHandle collector,
Set<CollectionHandle> allCollectors,
long startEpochNanos,
long epochNanos) {
long epochNanos,
boolean suppressSynchronousCollection) {
return null;
}
}

View File

@ -50,13 +50,18 @@ public abstract class MeterSharedState {
CollectionHandle collector,
Set<CollectionHandle> allCollectors,
MeterProviderSharedState meterProviderSharedState,
long epochNanos) {
long epochNanos,
boolean suppressSynchronousCollection) {
Collection<MetricStorage> metrics = getMetricStorageRegistry().getMetrics();
List<MetricData> result = new ArrayList<>(metrics.size());
for (MetricStorage metric : metrics) {
MetricData current =
metric.collectAndReset(
collector, allCollectors, meterProviderSharedState.getStartEpochNanos(), epochNanos);
collector,
allCollectors,
meterProviderSharedState.getStartEpochNanos(),
epochNanos,
suppressSynchronousCollection);
if (current != null) {
result.add(current);
}

View File

@ -31,6 +31,8 @@ public interface MetricStorage {
* @param allCollectors The set of all registered readers for metrics.
* @param startEpochNanos The start timestamp for this SDK.
* @param epochNanos The timestamp for this collection.
* @param suppressSynchronousCollection Whether or not to suppress active (blocking) collection of
* metrics, meaning recently collected data is "fresh enough"
* @return The {@link MetricData} from this collection period, or {@code null}.
*/
@Nullable
@ -38,5 +40,6 @@ public interface MetricStorage {
CollectionHandle collector,
Set<CollectionHandle> allCollectors,
long startEpochNanos,
long epochNanos);
long epochNanos,
boolean suppressSynchronousCollection);
}

View File

@ -183,7 +183,7 @@ public class SdkMeterProviderTest {
LongCounter longCounter = sdkMeter.counterBuilder("testLongCounter").build();
longCounter.add(10, Attributes.empty());
testClock.advance(Duration.ofNanos(50));
testClock.advance(Duration.ofSeconds(1));
assertThat(sdkMeterReader.collectAllMetrics())
.satisfiesExactly(
@ -198,13 +198,13 @@ public class SdkMeterProviderTest {
.satisfiesExactly(
point ->
assertThat(point)
.hasStartEpochNanos(testClock.now() - 50)
.hasStartEpochNanos(testClock.now() - 1000000000)
.hasEpochNanos(testClock.now())
.hasAttributes(Attributes.empty())
.hasBucketCounts(1)));
longCounter.add(10, Attributes.empty());
testClock.advance(Duration.ofNanos(50));
testClock.advance(Duration.ofSeconds(1));
assertThat(sdkMeterReader.collectAllMetrics())
.satisfiesExactly(
@ -218,7 +218,7 @@ public class SdkMeterProviderTest {
.satisfiesExactly(
point ->
assertThat(point)
.hasStartEpochNanos(testClock.now() - 50)
.hasStartEpochNanos(testClock.now() - 1000000000)
.hasEpochNanos(testClock.now())
.hasAttributes(Attributes.empty())
.hasBucketCounts(1)));
@ -251,7 +251,7 @@ public class SdkMeterProviderTest {
sdkMeter.histogramBuilder("testDoubleValueRecorder").build();
doubleValueRecorder.record(10, Attributes.empty());
testClock.advance(Duration.ofNanos(50));
testClock.advance(Duration.ofSeconds(1));
assertThat(sdkMeterReader.collectAllMetrics())
.allSatisfy(
@ -267,7 +267,7 @@ public class SdkMeterProviderTest {
.satisfiesExactlyInAnyOrder(
point ->
assertThat(point)
.hasStartEpochNanos(testClock.now() - 50)
.hasStartEpochNanos(testClock.now() - 1000000000)
.hasEpochNanos(testClock.now())
.hasAttributes(Attributes.empty())
.hasBucketCounts(1)))
@ -280,7 +280,7 @@ public class SdkMeterProviderTest {
"testLongValueRecorder",
"testDoubleValueRecorder");
testClock.advance(Duration.ofNanos(50));
testClock.advance(Duration.ofSeconds(1));
longCounter.add(10, Attributes.empty());
longUpDownCounter.add(10, Attributes.empty());
@ -303,7 +303,7 @@ public class SdkMeterProviderTest {
.satisfiesExactlyInAnyOrder(
point ->
assertThat(point)
.hasStartEpochNanos(testClock.now() - 50)
.hasStartEpochNanos(testClock.now() - 1000000000)
.hasEpochNanos(testClock.now())
.hasAttributes(Attributes.empty())
.hasBucketCounts(1)))

View File

@ -74,7 +74,7 @@ public class AsynchronousMetricStorageTest {
meterProviderSharedState.getResource(),
meterSharedState.getInstrumentationLibraryInfo(),
value -> value.observe(1.0, Attributes.empty()))
.collectAndReset(handle, all, 0, testClock.now());
.collectAndReset(handle, all, 0, testClock.now(), false);
Mockito.verify(spyAttributesProcessor).process(Attributes.empty(), Context.current());
}
@ -91,7 +91,7 @@ public class AsynchronousMetricStorageTest {
meterProviderSharedState.getResource(),
meterSharedState.getInstrumentationLibraryInfo(),
value -> value.observe(1, Attributes.empty()))
.collectAndReset(handle, all, 0, testClock.nanoTime());
.collectAndReset(handle, all, 0, testClock.nanoTime(), false);
Mockito.verify(spyAttributesProcessor).process(Attributes.empty(), Context.current());
}
}

View File

@ -63,25 +63,49 @@ class DeltaMetricStorageTest {
BoundStorageHandle bound = storage.bind(Attributes.empty());
bound.recordDouble(1, Attributes.empty(), Context.root());
// First collector only sees first recording.
assertThat(storage.collectFor(collector1, allCollectors))
assertThat(storage.collectFor(collector1, allCollectors, /* suppressCollection=*/ false))
.hasSize(1)
.hasEntrySatisfying(Attributes.empty(), value -> assertThat(value.getValue()).isEqualTo(1));
bound.recordDouble(2, Attributes.empty(), Context.root());
// First collector only sees second recording.
assertThat(storage.collectFor(collector1, allCollectors))
assertThat(storage.collectFor(collector1, allCollectors, /* suppressCollection=*/ false))
.hasSize(1)
.hasEntrySatisfying(Attributes.empty(), value -> assertThat(value.getValue()).isEqualTo(2));
// First collector no longer sees a recording.
assertThat(storage.collectFor(collector1, allCollectors)).isEmpty();
assertThat(storage.collectFor(collector1, allCollectors, /* suppressCollection=*/ false))
.isEmpty();
// Second collector gets merged recordings
assertThat(storage.collectFor(collector2, allCollectors))
assertThat(storage.collectFor(collector2, allCollectors, /* suppressCollection=*/ false))
.hasSize(1)
.hasEntrySatisfying(Attributes.empty(), value -> assertThat(value.getValue()).isEqualTo(3));
// Second collector no longer sees a recording.
assertThat(storage.collectFor(collector2, allCollectors)).isEmpty();
assertThat(storage.collectFor(collector2, allCollectors, /* suppressCollection=*/ false))
.isEmpty();
}
@Test
void avoidCollectionInRapidSuccession() {
BoundStorageHandle bound = storage.bind(Attributes.empty());
bound.recordDouble(1, Attributes.empty(), Context.root());
// First collector only sees first recording.
assertThat(storage.collectFor(collector1, allCollectors, /* suppressCollection=*/ false))
.hasSize(1)
.hasEntrySatisfying(Attributes.empty(), value -> assertThat(value.getValue()).isEqualTo(1));
// Add some data immediately after read, but pretent it hasn't been long.
bound.recordDouble(2, Attributes.empty(), Context.root());
// Collector1 doesn't see new data, because we don't recollect, but collector2 sees old delta.
assertThat(storage.collectFor(collector1, allCollectors, /* suppressCollection=*/ true))
.isEmpty();
assertThat(storage.collectFor(collector2, allCollectors, /* suppressCollection=*/ true))
.hasSize(1)
.hasEntrySatisfying(Attributes.empty(), value -> assertThat(value.getValue()).isEqualTo(1));
// After enough time passes, collector1 sees new data
assertThat(storage.collectFor(collector1, allCollectors, /* suppressCollection=*/ false))
.hasSize(1)
.hasEntrySatisfying(Attributes.empty(), value -> assertThat(value.getValue()).isEqualTo(2));
}
}

View File

@ -87,7 +87,8 @@ class MetricStorageRegistryTest {
CollectionHandle collector,
Set<CollectionHandle> all,
long startEpochNanos,
long epochNanos) {
long epochNanos,
boolean suppressSynchronousCollection) {
return null;
}
@ -115,7 +116,8 @@ class MetricStorageRegistryTest {
CollectionHandle collector,
Set<CollectionHandle> allCollectors,
long startEpochNanos,
long epochNanos) {
long epochNanos,
boolean suppressSynchronousCollection) {
return null;
}

View File

@ -75,7 +75,8 @@ public class SynchronousMetricStorageTest {
new DefaultSynchronousMetricStorage<>(METRIC_DESCRIPTOR, aggregator, spyLabelsProcessor);
BoundStorageHandle handle = accumulator.bind(labels);
handle.recordDouble(1, labels, Context.root());
MetricData md = accumulator.collectAndReset(collector, allCollectors, 0, testClock.now());
MetricData md =
accumulator.collectAndReset(collector, allCollectors, 0, testClock.now(), false);
assertThat(md)
.hasDoubleGauge()
.points()
@ -97,7 +98,7 @@ public class SynchronousMetricStorageTest {
accumulator.bind(Attributes.builder().put("K", "V").build());
try {
assertThat(duplicateHandle).isSameAs(handle);
accumulator.collectAndReset(collector, allCollectors, 0, testClock.now());
accumulator.collectAndReset(collector, allCollectors, 0, testClock.now(), false);
BoundStorageHandle anotherDuplicateAggregatorHandle =
accumulator.bind(Attributes.builder().put("K", "V").build());
try {
@ -112,6 +113,7 @@ public class SynchronousMetricStorageTest {
// If we try to collect once all bound references are gone AND no recordings have occurred, we
// should not see any labels (or metric).
assertThat(accumulator.collectAndReset(collector, allCollectors, 0, testClock.now())).isNull();
assertThat(accumulator.collectAndReset(collector, allCollectors, 0, testClock.now(), false))
.isNull();
}
}