Simplify logic in AsynchronousInstrumentAccumulator (#2349)
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
This commit is contained in:
		
							parent
							
								
									94422d9dc0
								
							
						
					
					
						commit
						0412d47ffe
					
				|  | @ -5,7 +5,6 @@ | |||
| 
 | ||||
| package io.opentelemetry.sdk.metrics; | ||||
| 
 | ||||
| import io.opentelemetry.api.common.Labels; | ||||
| import io.opentelemetry.api.metrics.AsynchronousInstrument; | ||||
| import io.opentelemetry.sdk.metrics.aggregator.Aggregator; | ||||
| import io.opentelemetry.sdk.metrics.data.MetricData; | ||||
|  | @ -17,34 +16,24 @@ import javax.annotation.Nullable; | |||
| final class AsynchronousInstrumentAccumulator { | ||||
|   private final ReentrantLock collectLock = new ReentrantLock(); | ||||
|   private final InstrumentProcessor instrumentProcessor; | ||||
|   private final Consumer<InstrumentProcessor> metricUpdater; | ||||
| 
 | ||||
|   private AsynchronousInstrumentAccumulator( | ||||
|       InstrumentProcessor instrumentProcessor, Consumer<InstrumentProcessor> metricUpdater) { | ||||
|     this.metricUpdater = metricUpdater; | ||||
|     this.instrumentProcessor = instrumentProcessor; | ||||
|   } | ||||
| 
 | ||||
|   public List<MetricData> collectAll() { | ||||
|     collectLock.lock(); | ||||
|     try { | ||||
|       metricUpdater.accept(instrumentProcessor); | ||||
|       return instrumentProcessor.completeCollectionCycle(); | ||||
|     } finally { | ||||
|       collectLock.unlock(); | ||||
|     } | ||||
|   } | ||||
|   private final Runnable metricUpdater; | ||||
| 
 | ||||
|   static AsynchronousInstrumentAccumulator doubleAsynchronousAccumulator( | ||||
|       InstrumentProcessor instrumentProcessor, | ||||
|       @Nullable Consumer<AsynchronousInstrument.DoubleResult> metricUpdater) { | ||||
|     // TODO: Decide what to do with null updater. | ||||
|     if (metricUpdater == null) { | ||||
|       return new AsynchronousInstrumentAccumulator(instrumentProcessor, instrumentProcessor1 -> {}); | ||||
|       return new AsynchronousInstrumentAccumulator(instrumentProcessor, () -> {}); | ||||
|     } | ||||
|     DoubleResultSdk result = new DoubleResultSdk(instrumentProcessor); | ||||
|     AsynchronousInstrument.DoubleResult result = | ||||
|         (value, labels) -> { | ||||
|           Aggregator aggregator = instrumentProcessor.getAggregator(); | ||||
|           aggregator.recordDouble(value); | ||||
|           instrumentProcessor.batch(labels, aggregator, /* mappedAggregator= */ false); | ||||
|         }; | ||||
| 
 | ||||
|     return new AsynchronousInstrumentAccumulator( | ||||
|         instrumentProcessor, instrumentProcessor1 -> metricUpdater.accept(result)); | ||||
|         instrumentProcessor, () -> metricUpdater.accept(result)); | ||||
|   } | ||||
| 
 | ||||
|   static AsynchronousInstrumentAccumulator longAsynchronousAccumulator( | ||||
|  | @ -52,40 +41,33 @@ final class AsynchronousInstrumentAccumulator { | |||
|       @Nullable Consumer<AsynchronousInstrument.LongResult> metricUpdater) { | ||||
|     // TODO: Decide what to do with null updater. | ||||
|     if (metricUpdater == null) { | ||||
|       return new AsynchronousInstrumentAccumulator(instrumentProcessor, instrumentProcessor1 -> {}); | ||||
|       return new AsynchronousInstrumentAccumulator(instrumentProcessor, () -> {}); | ||||
|     } | ||||
|     LongResultSdk result = new LongResultSdk(instrumentProcessor); | ||||
| 
 | ||||
|     AsynchronousInstrument.LongResult result = | ||||
|         (value, labels) -> { | ||||
|           Aggregator aggregator = instrumentProcessor.getAggregator(); | ||||
|           aggregator.recordLong(value); | ||||
|           instrumentProcessor.batch(labels, aggregator, /* mappedAggregator= */ false); | ||||
|         }; | ||||
| 
 | ||||
|     return new AsynchronousInstrumentAccumulator( | ||||
|         instrumentProcessor, instrumentProcessor1 -> metricUpdater.accept(result)); | ||||
|         instrumentProcessor, () -> metricUpdater.accept(result)); | ||||
|   } | ||||
| 
 | ||||
|   private static final class DoubleResultSdk implements AsynchronousInstrument.DoubleResult { | ||||
|     private final InstrumentProcessor instrumentProcessor; | ||||
| 
 | ||||
|     private DoubleResultSdk(InstrumentProcessor instrumentProcessor) { | ||||
|       this.instrumentProcessor = instrumentProcessor; | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public void observe(double sum, Labels labels) { | ||||
|       Aggregator aggregator = instrumentProcessor.getAggregator(); | ||||
|       aggregator.recordDouble(sum); | ||||
|       instrumentProcessor.batch(labels, aggregator, /* mappedAggregator= */ false); | ||||
|     } | ||||
|   private AsynchronousInstrumentAccumulator( | ||||
|       InstrumentProcessor instrumentProcessor, Runnable metricUpdater) { | ||||
|     this.metricUpdater = metricUpdater; | ||||
|     this.instrumentProcessor = instrumentProcessor; | ||||
|   } | ||||
| 
 | ||||
|   private static final class LongResultSdk implements AsynchronousInstrument.LongResult { | ||||
|     private final InstrumentProcessor instrumentProcessor; | ||||
| 
 | ||||
|     private LongResultSdk(InstrumentProcessor instrumentProcessor) { | ||||
|       this.instrumentProcessor = instrumentProcessor; | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public void observe(long sum, Labels labels) { | ||||
|       Aggregator aggregator = instrumentProcessor.getAggregator(); | ||||
|       aggregator.recordLong(sum); | ||||
|       instrumentProcessor.batch(labels, aggregator, /* mappedAggregator= */ false); | ||||
|   public List<MetricData> collectAll() { | ||||
|     collectLock.lock(); | ||||
|     try { | ||||
|       metricUpdater.run(); | ||||
|       return instrumentProcessor.completeCollectionCycle(); | ||||
|     } finally { | ||||
|       collectLock.unlock(); | ||||
|     } | ||||
|   } | ||||
| } | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue