Rename accumulator to processor to match Go (#2255)

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
This commit is contained in:
Bogdan Drutu 2020-12-09 20:19:31 -08:00 committed by GitHub
parent 56235a1114
commit feedef2366
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 103 additions and 106 deletions

View File

@ -18,15 +18,15 @@ abstract class AbstractAsynchronousInstrument<T extends AsynchronousInstrument.R
extends AbstractInstrument implements AsynchronousInstrument<T> {
@Nullable private final Callback<T> metricUpdater;
private final ReentrantLock collectLock = new ReentrantLock();
private final InstrumentAccumulator instrumentAccumulator;
private final InstrumentProcessor instrumentProcessor;
AbstractAsynchronousInstrument(
InstrumentDescriptor descriptor,
InstrumentAccumulator instrumentAccumulator,
InstrumentProcessor instrumentProcessor,
@Nullable Callback<T> metricUpdater) {
super(descriptor);
this.metricUpdater = metricUpdater;
this.instrumentAccumulator = instrumentAccumulator;
this.instrumentProcessor = instrumentProcessor;
}
@Override
@ -36,14 +36,14 @@ abstract class AbstractAsynchronousInstrument<T extends AsynchronousInstrument.R
}
collectLock.lock();
try {
metricUpdater.update(newResult(instrumentAccumulator));
return instrumentAccumulator.completeCollectionCycle();
metricUpdater.update(newResult(instrumentProcessor));
return instrumentProcessor.completeCollectionCycle();
} finally {
collectLock.unlock();
}
}
abstract T newResult(InstrumentAccumulator instrumentAccumulator);
abstract T newResult(InstrumentProcessor instrumentProcessor);
abstract static class Builder<B extends AbstractInstrument.Builder<?>>
extends AbstractInstrument.Builder<B> {
@ -60,29 +60,29 @@ abstract class AbstractAsynchronousInstrument<T extends AsynchronousInstrument.R
extends AbstractAsynchronousInstrument<LongResult> {
AbstractLongAsynchronousInstrument(
InstrumentDescriptor descriptor,
InstrumentAccumulator instrumentAccumulator,
InstrumentProcessor instrumentProcessor,
@Nullable Callback<LongResult> metricUpdater) {
super(descriptor, instrumentAccumulator, metricUpdater);
super(descriptor, instrumentProcessor, metricUpdater);
}
@Override
LongResultSdk newResult(InstrumentAccumulator instrumentAccumulator) {
return new LongResultSdk(instrumentAccumulator);
LongResultSdk newResult(InstrumentProcessor instrumentProcessor) {
return new LongResultSdk(instrumentProcessor);
}
private static final class LongResultSdk implements LongResult {
private final InstrumentAccumulator instrumentAccumulator;
private final InstrumentProcessor instrumentProcessor;
private LongResultSdk(InstrumentAccumulator instrumentAccumulator) {
this.instrumentAccumulator = instrumentAccumulator;
private LongResultSdk(InstrumentProcessor instrumentProcessor) {
this.instrumentProcessor = instrumentProcessor;
}
@Override
public void observe(long sum, Labels labels) {
Aggregator aggregator = instrumentAccumulator.getAggregator();
Aggregator aggregator = instrumentProcessor.getAggregator();
aggregator.recordLong(sum);
instrumentAccumulator.batch(labels, aggregator, /* mappedAggregator= */ false);
instrumentProcessor.batch(labels, aggregator, /* mappedAggregator= */ false);
}
}
}
@ -91,29 +91,29 @@ abstract class AbstractAsynchronousInstrument<T extends AsynchronousInstrument.R
extends AbstractAsynchronousInstrument<DoubleResult> {
AbstractDoubleAsynchronousInstrument(
InstrumentDescriptor descriptor,
InstrumentAccumulator instrumentAccumulator,
InstrumentProcessor instrumentProcessor,
@Nullable Callback<DoubleResult> metricUpdater) {
super(descriptor, instrumentAccumulator, metricUpdater);
super(descriptor, instrumentProcessor, metricUpdater);
}
@Override
DoubleResultSdk newResult(InstrumentAccumulator instrumentAccumulator) {
return new DoubleResultSdk(instrumentAccumulator);
DoubleResultSdk newResult(InstrumentProcessor instrumentProcessor) {
return new DoubleResultSdk(instrumentProcessor);
}
private static final class DoubleResultSdk implements DoubleResult {
private final InstrumentAccumulator instrumentAccumulator;
private final InstrumentProcessor instrumentProcessor;
private DoubleResultSdk(InstrumentAccumulator instrumentAccumulator) {
this.instrumentAccumulator = instrumentAccumulator;
private DoubleResultSdk(InstrumentProcessor instrumentProcessor) {
this.instrumentProcessor = instrumentProcessor;
}
@Override
public void observe(double sum, Labels labels) {
Aggregator aggregator = instrumentAccumulator.getAggregator();
Aggregator aggregator = instrumentProcessor.getAggregator();
aggregator.recordDouble(sum);
instrumentAccumulator.batch(labels, aggregator, /* mappedAggregator= */ false);
instrumentProcessor.batch(labels, aggregator, /* mappedAggregator= */ false);
}
}
}

View File

@ -101,7 +101,7 @@ abstract class AbstractInstrument implements Instrument {
return meterSharedState.getInstrumentRegistry().register(instrument);
}
protected InstrumentAccumulator getBatcher(InstrumentDescriptor descriptor) {
protected InstrumentProcessor getBatcher(InstrumentDescriptor descriptor) {
return meterSdk.createBatcher(descriptor, meterProviderSharedState, meterSharedState);
}
}

View File

@ -17,14 +17,14 @@ abstract class AbstractSynchronousInstrument<B extends AbstractBoundInstrument>
extends AbstractInstrument {
private final ConcurrentHashMap<Labels, B> boundLabels;
private final ReentrantLock collectLock;
private final InstrumentAccumulator instrumentAccumulator;
private final InstrumentProcessor instrumentProcessor;
AbstractSynchronousInstrument(
InstrumentDescriptor descriptor, InstrumentAccumulator instrumentAccumulator) {
InstrumentDescriptor descriptor, InstrumentProcessor instrumentProcessor) {
super(descriptor);
boundLabels = new ConcurrentHashMap<>();
collectLock = new ReentrantLock();
this.instrumentAccumulator = instrumentAccumulator;
this.instrumentProcessor = instrumentProcessor;
}
public B bind(Labels labels) {
@ -36,7 +36,7 @@ abstract class AbstractSynchronousInstrument<B extends AbstractBoundInstrument>
}
// Missing entry or no longer mapped, try to add a new entry.
binding = newBinding(instrumentAccumulator);
binding = newBinding(instrumentProcessor);
while (true) {
B oldBound = boundLabels.putIfAbsent(labels, binding);
if (oldBound != null) {
@ -68,14 +68,13 @@ abstract class AbstractSynchronousInstrument<B extends AbstractBoundInstrument>
// acquire but because we requested a specific value only one will succeed.
boundLabels.remove(entry.getKey(), entry.getValue());
}
instrumentAccumulator.batch(
entry.getKey(), entry.getValue().getAggregator(), !unmappedEntry);
instrumentProcessor.batch(entry.getKey(), entry.getValue().getAggregator(), !unmappedEntry);
}
return instrumentAccumulator.completeCollectionCycle();
return instrumentProcessor.completeCollectionCycle();
} finally {
collectLock.unlock();
}
}
abstract B newBinding(InstrumentAccumulator instrumentAccumulator);
abstract B newBinding(InstrumentProcessor instrumentProcessor);
}

View File

@ -15,8 +15,8 @@ final class DoubleCounterSdk extends AbstractSynchronousInstrument<BoundInstrume
implements DoubleCounter {
private DoubleCounterSdk(
InstrumentDescriptor descriptor, InstrumentAccumulator instrumentAccumulator) {
super(descriptor, instrumentAccumulator);
InstrumentDescriptor descriptor, InstrumentProcessor instrumentProcessor) {
super(descriptor, instrumentProcessor);
}
@Override
@ -35,15 +35,15 @@ final class DoubleCounterSdk extends AbstractSynchronousInstrument<BoundInstrume
}
@Override
BoundInstrument newBinding(InstrumentAccumulator instrumentAccumulator) {
return new BoundInstrument(instrumentAccumulator);
BoundInstrument newBinding(InstrumentProcessor instrumentProcessor) {
return new BoundInstrument(instrumentProcessor);
}
static final class BoundInstrument extends AbstractBoundInstrument
implements DoubleCounter.BoundDoubleCounter {
BoundInstrument(InstrumentAccumulator instrumentAccumulator) {
super(instrumentAccumulator.getAggregator());
BoundInstrument(InstrumentProcessor instrumentProcessor) {
super(instrumentProcessor.getAggregator());
}
@Override

View File

@ -16,9 +16,9 @@ final class DoubleSumObserverSdk extends AbstractDoubleAsynchronousInstrument
DoubleSumObserverSdk(
InstrumentDescriptor descriptor,
InstrumentAccumulator instrumentAccumulator,
InstrumentProcessor instrumentProcessor,
@Nullable Callback<DoubleResult> metricUpdater) {
super(descriptor, instrumentAccumulator, metricUpdater);
super(descriptor, instrumentProcessor, metricUpdater);
}
static final class Builder

View File

@ -15,8 +15,8 @@ final class DoubleUpDownCounterSdk extends AbstractSynchronousInstrument<BoundIn
implements DoubleUpDownCounter {
private DoubleUpDownCounterSdk(
InstrumentDescriptor descriptor, InstrumentAccumulator instrumentAccumulator) {
super(descriptor, instrumentAccumulator);
InstrumentDescriptor descriptor, InstrumentProcessor instrumentProcessor) {
super(descriptor, instrumentProcessor);
}
@Override
@ -32,15 +32,15 @@ final class DoubleUpDownCounterSdk extends AbstractSynchronousInstrument<BoundIn
}
@Override
BoundInstrument newBinding(InstrumentAccumulator instrumentAccumulator) {
return new BoundInstrument(instrumentAccumulator);
BoundInstrument newBinding(InstrumentProcessor instrumentProcessor) {
return new BoundInstrument(instrumentProcessor);
}
static final class BoundInstrument extends AbstractBoundInstrument
implements BoundDoubleUpDownCounter {
BoundInstrument(InstrumentAccumulator instrumentAccumulator) {
super(instrumentAccumulator.getAggregator());
BoundInstrument(InstrumentProcessor instrumentProcessor) {
super(instrumentProcessor.getAggregator());
}
@Override

View File

@ -16,9 +16,9 @@ final class DoubleUpDownSumObserverSdk extends AbstractDoubleAsynchronousInstrum
DoubleUpDownSumObserverSdk(
InstrumentDescriptor descriptor,
InstrumentAccumulator instrumentAccumulator,
InstrumentProcessor instrumentProcessor,
@Nullable Callback<DoubleResult> metricUpdater) {
super(descriptor, instrumentAccumulator, metricUpdater);
super(descriptor, instrumentProcessor, metricUpdater);
}
static final class Builder

View File

@ -16,9 +16,9 @@ final class DoubleValueObserverSdk extends AbstractDoubleAsynchronousInstrument
DoubleValueObserverSdk(
InstrumentDescriptor descriptor,
InstrumentAccumulator instrumentAccumulator,
InstrumentProcessor instrumentProcessor,
@Nullable Callback<DoubleResult> metricUpdater) {
super(descriptor, instrumentAccumulator, metricUpdater);
super(descriptor, instrumentProcessor, metricUpdater);
}
static final class Builder

View File

@ -15,8 +15,8 @@ final class DoubleValueRecorderSdk extends AbstractSynchronousInstrument<BoundIn
implements DoubleValueRecorder {
private DoubleValueRecorderSdk(
InstrumentDescriptor descriptor, InstrumentAccumulator instrumentAccumulator) {
super(descriptor, instrumentAccumulator);
InstrumentDescriptor descriptor, InstrumentProcessor instrumentProcessor) {
super(descriptor, instrumentProcessor);
}
@Override
@ -32,15 +32,15 @@ final class DoubleValueRecorderSdk extends AbstractSynchronousInstrument<BoundIn
}
@Override
BoundInstrument newBinding(InstrumentAccumulator instrumentAccumulator) {
return new BoundInstrument(instrumentAccumulator);
BoundInstrument newBinding(InstrumentProcessor instrumentProcessor) {
return new BoundInstrument(instrumentProcessor);
}
static final class BoundInstrument extends AbstractBoundInstrument
implements BoundDoubleValueRecorder {
BoundInstrument(InstrumentAccumulator instrumentAccumulator) {
super(instrumentAccumulator.getAggregator());
BoundInstrument(InstrumentProcessor instrumentProcessor) {
super(instrumentProcessor.getAggregator());
}
@Override

View File

@ -21,7 +21,7 @@ import java.util.Map;
import java.util.Objects;
/**
* An {@code InstrumentAccumulator} represents an internal instance of an {@code Accumulator} for a
* An {@code InstrumentProcessor} represents an internal instance of an {@code Accumulator} for a
* specific {code Instrument}. It records individual measurements (via the {@code Aggregator}). It
* batches together {@code Aggregator}s for the similar sets of labels.
*
@ -29,7 +29,7 @@ import java.util.Objects;
* cycle must be protected by a lock. A collection cycle is defined by multiple calls to {@link
* #batch(Labels, Aggregator, boolean)} followed by one {@link #completeCollectionCycle()};
*/
final class InstrumentAccumulator {
final class InstrumentProcessor {
private final InstrumentDescriptor descriptor;
private final Aggregation aggregation;
private final Resource resource;
@ -45,12 +45,12 @@ final class InstrumentAccumulator {
* aggregation. "Cumulative" means that all metrics that are generated will be considered for the
* lifetime of the Instrument being aggregated.
*/
static InstrumentAccumulator getCumulativeAllLabels(
static InstrumentProcessor getCumulativeAllLabels(
InstrumentDescriptor descriptor,
MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState,
Aggregation aggregation) {
return new InstrumentAccumulator(
return new InstrumentProcessor(
descriptor,
aggregation,
meterProviderSharedState.getResource(),
@ -64,12 +64,12 @@ final class InstrumentAccumulator {
* aggregation. "Delta" means that all metrics that are generated are only for the most recent
* collection interval.
*/
static InstrumentAccumulator getDeltaAllLabels(
static InstrumentProcessor getDeltaAllLabels(
InstrumentDescriptor descriptor,
MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState,
Aggregation aggregation) {
return new InstrumentAccumulator(
return new InstrumentProcessor(
descriptor,
aggregation,
meterProviderSharedState.getResource(),
@ -78,7 +78,7 @@ final class InstrumentAccumulator {
/* delta= */ true);
}
private InstrumentAccumulator(
private InstrumentProcessor(
InstrumentDescriptor descriptor,
Aggregation aggregation,
Resource resource,
@ -113,7 +113,7 @@ final class InstrumentAccumulator {
* @param aggregator the {@link Aggregator} used to aggregate individual events for the given
* {@code LabelSetSdk}.
* @param mappedAggregator {@code true} if the {@code Aggregator} is still in used by a binding.
* If {@code false} the {@code Batcher} can reuse the {@code Aggregator} instance.
* If {@code false} the {@code InstrumentProcessor} can reuse the {@code Aggregator} instance.
*/
void batch(Labels labelSet, Aggregator aggregator, boolean mappedAggregator) {
if (!aggregator.hasRecordings()) {
@ -183,7 +183,7 @@ final class InstrumentAccumulator {
return false;
}
InstrumentAccumulator allLabels = (InstrumentAccumulator) o;
InstrumentProcessor allLabels = (InstrumentProcessor) o;
if (startEpochNanos != allLabels.startEpochNanos) {
return false;

View File

@ -14,9 +14,8 @@ import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
final class LongCounterSdk extends AbstractSynchronousInstrument<BoundInstrument>
implements LongCounter {
private LongCounterSdk(
InstrumentDescriptor descriptor, InstrumentAccumulator instrumentAccumulator) {
super(descriptor, instrumentAccumulator);
private LongCounterSdk(InstrumentDescriptor descriptor, InstrumentProcessor instrumentProcessor) {
super(descriptor, instrumentProcessor);
}
@Override
@ -35,15 +34,15 @@ final class LongCounterSdk extends AbstractSynchronousInstrument<BoundInstrument
}
@Override
BoundInstrument newBinding(InstrumentAccumulator instrumentAccumulator) {
return new BoundInstrument(instrumentAccumulator);
BoundInstrument newBinding(InstrumentProcessor instrumentProcessor) {
return new BoundInstrument(instrumentProcessor);
}
static final class BoundInstrument extends AbstractBoundInstrument
implements LongCounter.BoundLongCounter {
BoundInstrument(InstrumentAccumulator instrumentAccumulator) {
super(instrumentAccumulator.getAggregator());
BoundInstrument(InstrumentProcessor instrumentProcessor) {
super(instrumentProcessor.getAggregator());
}
@Override

View File

@ -15,9 +15,9 @@ final class LongSumObserverSdk extends AbstractLongAsynchronousInstrument
implements LongSumObserver {
LongSumObserverSdk(
InstrumentDescriptor descriptor,
InstrumentAccumulator instrumentAccumulator,
InstrumentProcessor instrumentProcessor,
@Nullable Callback<LongResult> metricUpdater) {
super(descriptor, instrumentAccumulator, metricUpdater);
super(descriptor, instrumentProcessor, metricUpdater);
}
static final class Builder

View File

@ -15,8 +15,8 @@ final class LongUpDownCounterSdk extends AbstractSynchronousInstrument<BoundInst
implements LongUpDownCounter {
private LongUpDownCounterSdk(
InstrumentDescriptor descriptor, InstrumentAccumulator instrumentAccumulator) {
super(descriptor, instrumentAccumulator);
InstrumentDescriptor descriptor, InstrumentProcessor instrumentProcessor) {
super(descriptor, instrumentProcessor);
}
@Override
@ -32,15 +32,15 @@ final class LongUpDownCounterSdk extends AbstractSynchronousInstrument<BoundInst
}
@Override
BoundInstrument newBinding(InstrumentAccumulator instrumentAccumulator) {
return new BoundInstrument(instrumentAccumulator);
BoundInstrument newBinding(InstrumentProcessor instrumentProcessor) {
return new BoundInstrument(instrumentProcessor);
}
static final class BoundInstrument extends AbstractBoundInstrument
implements BoundLongUpDownCounter {
BoundInstrument(InstrumentAccumulator instrumentAccumulator) {
super(instrumentAccumulator.getAggregator());
BoundInstrument(InstrumentProcessor instrumentProcessor) {
super(instrumentProcessor.getAggregator());
}
@Override

View File

@ -15,9 +15,9 @@ final class LongUpDownSumObserverSdk extends AbstractLongAsynchronousInstrument
implements LongUpDownSumObserver {
LongUpDownSumObserverSdk(
InstrumentDescriptor descriptor,
InstrumentAccumulator instrumentAccumulator,
InstrumentProcessor instrumentProcessor,
@Nullable Callback<LongResult> metricUpdater) {
super(descriptor, instrumentAccumulator, metricUpdater);
super(descriptor, instrumentProcessor, metricUpdater);
}
static final class Builder

View File

@ -16,9 +16,9 @@ final class LongValueObserverSdk extends AbstractLongAsynchronousInstrument
LongValueObserverSdk(
InstrumentDescriptor descriptor,
InstrumentAccumulator instrumentAccumulator,
InstrumentProcessor instrumentProcessor,
@Nullable Callback<LongResult> metricUpdater) {
super(descriptor, instrumentAccumulator, metricUpdater);
super(descriptor, instrumentProcessor, metricUpdater);
}
static final class Builder

View File

@ -15,8 +15,8 @@ final class LongValueRecorderSdk extends AbstractSynchronousInstrument<BoundInst
implements LongValueRecorder {
private LongValueRecorderSdk(
InstrumentDescriptor descriptor, InstrumentAccumulator instrumentAccumulator) {
super(descriptor, instrumentAccumulator);
InstrumentDescriptor descriptor, InstrumentProcessor instrumentProcessor) {
super(descriptor, instrumentProcessor);
}
@Override
@ -32,15 +32,15 @@ final class LongValueRecorderSdk extends AbstractSynchronousInstrument<BoundInst
}
@Override
BoundInstrument newBinding(InstrumentAccumulator instrumentAccumulator) {
return new BoundInstrument(instrumentAccumulator);
BoundInstrument newBinding(InstrumentProcessor instrumentProcessor) {
return new BoundInstrument(instrumentProcessor);
}
static final class BoundInstrument extends AbstractBoundInstrument
implements BoundLongValueRecorder {
BoundInstrument(InstrumentAccumulator instrumentAccumulator) {
super(instrumentAccumulator.getAggregator());
BoundInstrument(InstrumentProcessor instrumentProcessor) {
super(instrumentProcessor.getAggregator());
}
@Override

View File

@ -113,10 +113,9 @@ final class MeterSdk implements Meter {
}
/**
* Creates a {@link InstrumentAccumulator}, by using the {@link ViewRegistry} to do the actual
* work.
* Creates a {@link InstrumentProcessor}, by using the {@link ViewRegistry} to do the actual work.
*/
InstrumentAccumulator createBatcher(
InstrumentProcessor createBatcher(
InstrumentDescriptor descriptor,
MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState) {

View File

@ -44,8 +44,8 @@ class ViewRegistry {
aggregationChooser.addView(selector, specification);
}
/** Create a new {@link InstrumentAccumulator} for use in metric recording aggregation. */
InstrumentAccumulator createBatcher(
/** Create a new {@link InstrumentProcessor} for use in metric recording aggregation. */
InstrumentProcessor createBatcher(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState,
InstrumentDescriptor descriptor) {
@ -55,10 +55,10 @@ class ViewRegistry {
Aggregation aggregation = specification.aggregation();
if (Temporality.CUMULATIVE == specification.temporality()) {
return InstrumentAccumulator.getCumulativeAllLabels(
return InstrumentProcessor.getCumulativeAllLabels(
descriptor, meterProviderSharedState, meterSharedState, aggregation);
} else if (Temporality.DELTA == specification.temporality()) {
return InstrumentAccumulator.getDeltaAllLabels(
return InstrumentProcessor.getDeltaAllLabels(
descriptor, meterProviderSharedState, meterSharedState, aggregation);
}
throw new IllegalStateException("unsupported Temporality: " + specification.temporality());

View File

@ -55,17 +55,17 @@ class ViewRegistryTest {
AggregationConfiguration specification =
AggregationConfiguration.create(
Aggregations.count(), AggregationConfiguration.Temporality.CUMULATIVE);
InstrumentAccumulator expectedInstrumentAccumulator =
InstrumentAccumulator.getCumulativeAllLabels(
InstrumentProcessor expectedInstrumentProcessor =
InstrumentProcessor.getCumulativeAllLabels(
descriptor, providerSharedState, meterSharedState, Aggregations.count());
when(chooser.chooseAggregation(descriptor)).thenReturn(specification);
InstrumentAccumulator result =
InstrumentProcessor result =
viewRegistry.createBatcher(providerSharedState, meterSharedState, descriptor);
assertThat(result.generatesDeltas()).isFalse();
assertThat(result).isEqualTo(expectedInstrumentAccumulator);
assertThat(result).isEqualTo(expectedInstrumentProcessor);
assertThat(result).isNotNull();
}
@ -87,17 +87,17 @@ class ViewRegistryTest {
AggregationConfiguration specification =
AggregationConfiguration.create(
Aggregations.count(), AggregationConfiguration.Temporality.DELTA);
InstrumentAccumulator expectedInstrumentAccumulator =
InstrumentAccumulator.getDeltaAllLabels(
InstrumentProcessor expectedInstrumentProcessor =
InstrumentProcessor.getDeltaAllLabels(
descriptor, providerSharedState, meterSharedState, Aggregations.count());
when(chooser.chooseAggregation(descriptor)).thenReturn(specification);
InstrumentAccumulator result =
InstrumentProcessor result =
viewRegistry.createBatcher(providerSharedState, meterSharedState, descriptor);
assertThat(result.generatesDeltas()).isTrue();
assertThat(result).isEqualTo(expectedInstrumentAccumulator);
assertThat(result).isEqualTo(expectedInstrumentProcessor);
assertThat(result).isNotNull();
}