From 0412d47ffe54a0bc3329e2de86aac7dda0344541 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Fri, 18 Dec 2020 09:08:19 -0800 Subject: [PATCH] Simplify logic in AsynchronousInstrumentAccumulator (#2349) Signed-off-by: Bogdan Drutu --- .../AsynchronousInstrumentAccumulator.java | 80 +++++++------------ 1 file changed, 31 insertions(+), 49 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/AsynchronousInstrumentAccumulator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/AsynchronousInstrumentAccumulator.java index 6609ba5032..fdea0e86e5 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/AsynchronousInstrumentAccumulator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/AsynchronousInstrumentAccumulator.java @@ -5,7 +5,6 @@ package io.opentelemetry.sdk.metrics; -import io.opentelemetry.api.common.Labels; import io.opentelemetry.api.metrics.AsynchronousInstrument; import io.opentelemetry.sdk.metrics.aggregator.Aggregator; import io.opentelemetry.sdk.metrics.data.MetricData; @@ -17,34 +16,24 @@ import javax.annotation.Nullable; final class AsynchronousInstrumentAccumulator { private final ReentrantLock collectLock = new ReentrantLock(); private final InstrumentProcessor instrumentProcessor; - private final Consumer metricUpdater; - - private AsynchronousInstrumentAccumulator( - InstrumentProcessor instrumentProcessor, Consumer metricUpdater) { - this.metricUpdater = metricUpdater; - this.instrumentProcessor = instrumentProcessor; - } - - public List collectAll() { - collectLock.lock(); - try { - metricUpdater.accept(instrumentProcessor); - return instrumentProcessor.completeCollectionCycle(); - } finally { - collectLock.unlock(); - } - } + private final Runnable metricUpdater; static AsynchronousInstrumentAccumulator doubleAsynchronousAccumulator( InstrumentProcessor instrumentProcessor, @Nullable Consumer metricUpdater) { // TODO: Decide what to do with null updater. if (metricUpdater == null) { - return new AsynchronousInstrumentAccumulator(instrumentProcessor, instrumentProcessor1 -> {}); + return new AsynchronousInstrumentAccumulator(instrumentProcessor, () -> {}); } - DoubleResultSdk result = new DoubleResultSdk(instrumentProcessor); + AsynchronousInstrument.DoubleResult result = + (value, labels) -> { + Aggregator aggregator = instrumentProcessor.getAggregator(); + aggregator.recordDouble(value); + instrumentProcessor.batch(labels, aggregator, /* mappedAggregator= */ false); + }; + return new AsynchronousInstrumentAccumulator( - instrumentProcessor, instrumentProcessor1 -> metricUpdater.accept(result)); + instrumentProcessor, () -> metricUpdater.accept(result)); } static AsynchronousInstrumentAccumulator longAsynchronousAccumulator( @@ -52,40 +41,33 @@ final class AsynchronousInstrumentAccumulator { @Nullable Consumer metricUpdater) { // TODO: Decide what to do with null updater. if (metricUpdater == null) { - return new AsynchronousInstrumentAccumulator(instrumentProcessor, instrumentProcessor1 -> {}); + return new AsynchronousInstrumentAccumulator(instrumentProcessor, () -> {}); } - LongResultSdk result = new LongResultSdk(instrumentProcessor); + + AsynchronousInstrument.LongResult result = + (value, labels) -> { + Aggregator aggregator = instrumentProcessor.getAggregator(); + aggregator.recordLong(value); + instrumentProcessor.batch(labels, aggregator, /* mappedAggregator= */ false); + }; + return new AsynchronousInstrumentAccumulator( - instrumentProcessor, instrumentProcessor1 -> metricUpdater.accept(result)); + instrumentProcessor, () -> metricUpdater.accept(result)); } - private static final class DoubleResultSdk implements AsynchronousInstrument.DoubleResult { - private final InstrumentProcessor instrumentProcessor; - - private DoubleResultSdk(InstrumentProcessor instrumentProcessor) { - this.instrumentProcessor = instrumentProcessor; - } - - @Override - public void observe(double sum, Labels labels) { - Aggregator aggregator = instrumentProcessor.getAggregator(); - aggregator.recordDouble(sum); - instrumentProcessor.batch(labels, aggregator, /* mappedAggregator= */ false); - } + private AsynchronousInstrumentAccumulator( + InstrumentProcessor instrumentProcessor, Runnable metricUpdater) { + this.metricUpdater = metricUpdater; + this.instrumentProcessor = instrumentProcessor; } - private static final class LongResultSdk implements AsynchronousInstrument.LongResult { - private final InstrumentProcessor instrumentProcessor; - - private LongResultSdk(InstrumentProcessor instrumentProcessor) { - this.instrumentProcessor = instrumentProcessor; - } - - @Override - public void observe(long sum, Labels labels) { - Aggregator aggregator = instrumentProcessor.getAggregator(); - aggregator.recordLong(sum); - instrumentProcessor.batch(labels, aggregator, /* mappedAggregator= */ false); + public List collectAll() { + collectLock.lock(); + try { + metricUpdater.run(); + return instrumentProcessor.completeCollectionCycle(); + } finally { + collectLock.unlock(); } } }