Rename batcher to instrumentaccumulator, remove active batcher (not used) (#2245)

* Rename batcher to instrumentaccumulator, remove active batcher (not used)

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>

* Fix tests, add back check for has records

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
This commit is contained in:
Bogdan Drutu 2020-12-09 11:27:20 -08:00 committed by GitHub
parent 681579c30b
commit d9260d0656
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 168 additions and 197 deletions

View File

@ -23,9 +23,9 @@ abstract class AbstractAsynchronousInstrument<T extends AsynchronousInstrument.R
InstrumentDescriptor descriptor, InstrumentDescriptor descriptor,
MeterProviderSharedState meterProviderSharedState, MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState, MeterSharedState meterSharedState,
ActiveBatcher activeBatcher, InstrumentAccumulator instrumentAccumulator,
@Nullable Callback<T> metricUpdater) { @Nullable Callback<T> metricUpdater) {
super(descriptor, meterProviderSharedState, meterSharedState, activeBatcher); super(descriptor, meterProviderSharedState, meterSharedState, instrumentAccumulator);
this.metricUpdater = metricUpdater; this.metricUpdater = metricUpdater;
} }
@ -36,15 +36,15 @@ abstract class AbstractAsynchronousInstrument<T extends AsynchronousInstrument.R
} }
collectLock.lock(); collectLock.lock();
try { try {
final ActiveBatcher activeBatcher = getActiveBatcher(); final InstrumentAccumulator instrumentAccumulator = getInstrumentAccumulator();
metricUpdater.update(newResult(activeBatcher)); metricUpdater.update(newResult(instrumentAccumulator));
return activeBatcher.completeCollectionCycle(); return instrumentAccumulator.completeCollectionCycle();
} finally { } finally {
collectLock.unlock(); collectLock.unlock();
} }
} }
abstract T newResult(ActiveBatcher activeBatcher); abstract T newResult(InstrumentAccumulator instrumentAccumulator);
abstract static class Builder<B extends AbstractInstrument.Builder<?>> abstract static class Builder<B extends AbstractInstrument.Builder<?>>
extends AbstractInstrument.Builder<B> { extends AbstractInstrument.Builder<B> {
@ -63,29 +63,34 @@ abstract class AbstractAsynchronousInstrument<T extends AsynchronousInstrument.R
InstrumentDescriptor descriptor, InstrumentDescriptor descriptor,
MeterProviderSharedState meterProviderSharedState, MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState, MeterSharedState meterSharedState,
ActiveBatcher activeBatcher, InstrumentAccumulator instrumentAccumulator,
@Nullable Callback<LongResult> metricUpdater) { @Nullable Callback<LongResult> metricUpdater) {
super(descriptor, meterProviderSharedState, meterSharedState, activeBatcher, metricUpdater); super(
descriptor,
meterProviderSharedState,
meterSharedState,
instrumentAccumulator,
metricUpdater);
} }
@Override @Override
LongResultSdk newResult(ActiveBatcher activeBatcher) { LongResultSdk newResult(InstrumentAccumulator instrumentAccumulator) {
return new LongResultSdk(activeBatcher); return new LongResultSdk(instrumentAccumulator);
} }
private static final class LongResultSdk implements LongResult { private static final class LongResultSdk implements LongResult {
private final ActiveBatcher activeBatcher; private final InstrumentAccumulator instrumentAccumulator;
private LongResultSdk(ActiveBatcher activeBatcher) { private LongResultSdk(InstrumentAccumulator instrumentAccumulator) {
this.activeBatcher = activeBatcher; this.instrumentAccumulator = instrumentAccumulator;
} }
@Override @Override
public void observe(long sum, Labels labels) { public void observe(long sum, Labels labels) {
Aggregator aggregator = activeBatcher.getAggregator(); Aggregator aggregator = instrumentAccumulator.getAggregator();
aggregator.recordLong(sum); aggregator.recordLong(sum);
activeBatcher.batch(labels, aggregator, /* mappedAggregator= */ false); instrumentAccumulator.batch(labels, aggregator, /* mappedAggregator= */ false);
} }
} }
} }
@ -96,29 +101,34 @@ abstract class AbstractAsynchronousInstrument<T extends AsynchronousInstrument.R
InstrumentDescriptor descriptor, InstrumentDescriptor descriptor,
MeterProviderSharedState meterProviderSharedState, MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState, MeterSharedState meterSharedState,
ActiveBatcher activeBatcher, InstrumentAccumulator instrumentAccumulator,
@Nullable Callback<DoubleResult> metricUpdater) { @Nullable Callback<DoubleResult> metricUpdater) {
super(descriptor, meterProviderSharedState, meterSharedState, activeBatcher, metricUpdater); super(
descriptor,
meterProviderSharedState,
meterSharedState,
instrumentAccumulator,
metricUpdater);
} }
@Override @Override
DoubleResultSdk newResult(ActiveBatcher activeBatcher) { DoubleResultSdk newResult(InstrumentAccumulator instrumentAccumulator) {
return new DoubleResultSdk(activeBatcher); return new DoubleResultSdk(instrumentAccumulator);
} }
private static final class DoubleResultSdk implements DoubleResult { private static final class DoubleResultSdk implements DoubleResult {
private final ActiveBatcher activeBatcher; private final InstrumentAccumulator instrumentAccumulator;
private DoubleResultSdk(ActiveBatcher activeBatcher) { private DoubleResultSdk(InstrumentAccumulator instrumentAccumulator) {
this.activeBatcher = activeBatcher; this.instrumentAccumulator = instrumentAccumulator;
} }
@Override @Override
public void observe(double sum, Labels labels) { public void observe(double sum, Labels labels) {
Aggregator aggregator = activeBatcher.getAggregator(); Aggregator aggregator = instrumentAccumulator.getAggregator();
aggregator.recordDouble(sum); aggregator.recordDouble(sum);
activeBatcher.batch(labels, aggregator, /* mappedAggregator= */ false); instrumentAccumulator.batch(labels, aggregator, /* mappedAggregator= */ false);
} }
} }
} }

View File

@ -19,18 +19,18 @@ abstract class AbstractInstrument implements Instrument {
private final InstrumentDescriptor descriptor; private final InstrumentDescriptor descriptor;
private final MeterProviderSharedState meterProviderSharedState; private final MeterProviderSharedState meterProviderSharedState;
private final MeterSharedState meterSharedState; private final MeterSharedState meterSharedState;
private final ActiveBatcher activeBatcher; private final InstrumentAccumulator instrumentAccumulator;
// All arguments cannot be null because they are checked in the abstract builder classes. // All arguments cannot be null because they are checked in the abstract builder classes.
AbstractInstrument( AbstractInstrument(
InstrumentDescriptor descriptor, InstrumentDescriptor descriptor,
MeterProviderSharedState meterProviderSharedState, MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState, MeterSharedState meterSharedState,
ActiveBatcher activeBatcher) { InstrumentAccumulator instrumentAccumulator) {
this.descriptor = descriptor; this.descriptor = descriptor;
this.meterProviderSharedState = meterProviderSharedState; this.meterProviderSharedState = meterProviderSharedState;
this.meterSharedState = meterSharedState; this.meterSharedState = meterSharedState;
this.activeBatcher = activeBatcher; this.instrumentAccumulator = instrumentAccumulator;
} }
final InstrumentDescriptor getDescriptor() { final InstrumentDescriptor getDescriptor() {
@ -45,8 +45,8 @@ abstract class AbstractInstrument implements Instrument {
return meterSharedState; return meterSharedState;
} }
final ActiveBatcher getActiveBatcher() { final InstrumentAccumulator getInstrumentAccumulator() {
return activeBatcher; return instrumentAccumulator;
} }
/** /**
@ -131,7 +131,7 @@ abstract class AbstractInstrument implements Instrument {
return getMeterSharedState().getInstrumentRegistry().register(instrument); return getMeterSharedState().getInstrumentRegistry().register(instrument);
} }
protected Batcher getBatcher(InstrumentDescriptor descriptor) { protected InstrumentAccumulator getBatcher(InstrumentDescriptor descriptor) {
return meterSdk.createBatcher(descriptor, meterProviderSharedState, meterSharedState); return meterSdk.createBatcher(descriptor, meterProviderSharedState, meterSharedState);
} }
} }

View File

@ -22,8 +22,8 @@ abstract class AbstractSynchronousInstrument<B extends AbstractBoundInstrument>
InstrumentDescriptor descriptor, InstrumentDescriptor descriptor,
MeterProviderSharedState meterProviderSharedState, MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState, MeterSharedState meterSharedState,
ActiveBatcher activeBatcher) { InstrumentAccumulator instrumentAccumulator) {
super(descriptor, meterProviderSharedState, meterSharedState, activeBatcher); super(descriptor, meterProviderSharedState, meterSharedState, instrumentAccumulator);
boundLabels = new ConcurrentHashMap<>(); boundLabels = new ConcurrentHashMap<>();
collectLock = new ReentrantLock(); collectLock = new ReentrantLock();
} }
@ -37,7 +37,7 @@ abstract class AbstractSynchronousInstrument<B extends AbstractBoundInstrument>
} }
// Missing entry or no longer mapped, try to add a new entry. // Missing entry or no longer mapped, try to add a new entry.
binding = newBinding(getActiveBatcher()); binding = newBinding(getInstrumentAccumulator());
while (true) { while (true) {
B oldBound = boundLabels.putIfAbsent(labels, binding); B oldBound = boundLabels.putIfAbsent(labels, binding);
if (oldBound != null) { if (oldBound != null) {
@ -62,7 +62,7 @@ abstract class AbstractSynchronousInstrument<B extends AbstractBoundInstrument>
final List<MetricData> collectAll() { final List<MetricData> collectAll() {
collectLock.lock(); collectLock.lock();
try { try {
Batcher batcher = getActiveBatcher(); InstrumentAccumulator instrumentAccumulator = getInstrumentAccumulator();
for (Map.Entry<Labels, B> entry : boundLabels.entrySet()) { for (Map.Entry<Labels, B> entry : boundLabels.entrySet()) {
boolean unmappedEntry = entry.getValue().tryUnmap(); boolean unmappedEntry = entry.getValue().tryUnmap();
if (unmappedEntry) { if (unmappedEntry) {
@ -70,13 +70,14 @@ abstract class AbstractSynchronousInstrument<B extends AbstractBoundInstrument>
// acquire but because we requested a specific value only one will succeed. // acquire but because we requested a specific value only one will succeed.
boundLabels.remove(entry.getKey(), entry.getValue()); boundLabels.remove(entry.getKey(), entry.getValue());
} }
batcher.batch(entry.getKey(), entry.getValue().getAggregator(), unmappedEntry); instrumentAccumulator.batch(
entry.getKey(), entry.getValue().getAggregator(), unmappedEntry);
} }
return batcher.completeCollectionCycle(); return instrumentAccumulator.completeCollectionCycle();
} finally { } finally {
collectLock.unlock(); collectLock.unlock();
} }
} }
abstract B newBinding(Batcher batcher); abstract B newBinding(InstrumentAccumulator instrumentAccumulator);
} }

View File

@ -1,49 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.metrics;
import io.opentelemetry.api.common.Labels;
import io.opentelemetry.sdk.metrics.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.data.MetricData;
import java.util.List;
/**
* Tracks a list of active Batchers used to aggregate measurements recorded by one {@code
* Instrument}.
*
* <p>TODO: Add support for multiple "Batchers" in the same time.
*
* <p>TODO: Consider if support for changing batchers at runtime is needed.
*/
final class ActiveBatcher implements Batcher {
private final Batcher batcher;
ActiveBatcher(Batcher batcher) {
this.batcher = batcher;
}
@Override
public Aggregator getAggregator() {
return batcher.getAggregator();
}
@Override
public void batch(Labels labelSet, Aggregator aggregator, boolean mappedAggregator) {
if (aggregator.hasRecordings()) {
batcher.batch(labelSet, aggregator, mappedAggregator);
}
}
@Override
public List<MetricData> completeCollectionCycle() {
return batcher.completeCollectionCycle();
}
@Override
public boolean generatesDeltas() {
return batcher.generatesDeltas();
}
}

View File

@ -18,8 +18,8 @@ final class DoubleCounterSdk extends AbstractSynchronousInstrument<BoundInstrume
InstrumentDescriptor descriptor, InstrumentDescriptor descriptor,
MeterProviderSharedState meterProviderSharedState, MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState, MeterSharedState meterSharedState,
Batcher batcher) { InstrumentAccumulator instrumentAccumulator) {
super(descriptor, meterProviderSharedState, meterSharedState, new ActiveBatcher(batcher)); super(descriptor, meterProviderSharedState, meterSharedState, instrumentAccumulator);
} }
@Override @Override
@ -38,15 +38,15 @@ final class DoubleCounterSdk extends AbstractSynchronousInstrument<BoundInstrume
} }
@Override @Override
BoundInstrument newBinding(Batcher batcher) { BoundInstrument newBinding(InstrumentAccumulator instrumentAccumulator) {
return new BoundInstrument(batcher); return new BoundInstrument(instrumentAccumulator);
} }
static final class BoundInstrument extends AbstractBoundInstrument static final class BoundInstrument extends AbstractBoundInstrument
implements DoubleCounter.BoundDoubleCounter { implements DoubleCounter.BoundDoubleCounter {
BoundInstrument(Batcher batcher) { BoundInstrument(InstrumentAccumulator instrumentAccumulator) {
super(batcher.getAggregator()); super(instrumentAccumulator.getAggregator());
} }
@Override @Override

View File

@ -18,13 +18,13 @@ final class DoubleSumObserverSdk extends AbstractDoubleAsynchronousInstrument
InstrumentDescriptor descriptor, InstrumentDescriptor descriptor,
MeterProviderSharedState meterProviderSharedState, MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState, MeterSharedState meterSharedState,
Batcher batcher, InstrumentAccumulator instrumentAccumulator,
@Nullable Callback<DoubleResult> metricUpdater) { @Nullable Callback<DoubleResult> metricUpdater) {
super( super(
descriptor, descriptor,
meterProviderSharedState, meterProviderSharedState,
meterSharedState, meterSharedState,
new ActiveBatcher(batcher), instrumentAccumulator,
metricUpdater); metricUpdater);
} }

View File

@ -18,8 +18,8 @@ final class DoubleUpDownCounterSdk extends AbstractSynchronousInstrument<BoundIn
InstrumentDescriptor descriptor, InstrumentDescriptor descriptor,
MeterProviderSharedState meterProviderSharedState, MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState, MeterSharedState meterSharedState,
Batcher batcher) { InstrumentAccumulator instrumentAccumulator) {
super(descriptor, meterProviderSharedState, meterSharedState, new ActiveBatcher(batcher)); super(descriptor, meterProviderSharedState, meterSharedState, instrumentAccumulator);
} }
@Override @Override
@ -35,15 +35,15 @@ final class DoubleUpDownCounterSdk extends AbstractSynchronousInstrument<BoundIn
} }
@Override @Override
BoundInstrument newBinding(Batcher batcher) { BoundInstrument newBinding(InstrumentAccumulator instrumentAccumulator) {
return new BoundInstrument(batcher); return new BoundInstrument(instrumentAccumulator);
} }
static final class BoundInstrument extends AbstractBoundInstrument static final class BoundInstrument extends AbstractBoundInstrument
implements BoundDoubleUpDownCounter { implements BoundDoubleUpDownCounter {
BoundInstrument(Batcher batcher) { BoundInstrument(InstrumentAccumulator instrumentAccumulator) {
super(batcher.getAggregator()); super(instrumentAccumulator.getAggregator());
} }
@Override @Override

View File

@ -18,13 +18,13 @@ final class DoubleUpDownSumObserverSdk extends AbstractDoubleAsynchronousInstrum
InstrumentDescriptor descriptor, InstrumentDescriptor descriptor,
MeterProviderSharedState meterProviderSharedState, MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState, MeterSharedState meterSharedState,
Batcher batcher, InstrumentAccumulator instrumentAccumulator,
@Nullable Callback<DoubleResult> metricUpdater) { @Nullable Callback<DoubleResult> metricUpdater) {
super( super(
descriptor, descriptor,
meterProviderSharedState, meterProviderSharedState,
meterSharedState, meterSharedState,
new ActiveBatcher(batcher), instrumentAccumulator,
metricUpdater); metricUpdater);
} }

View File

@ -18,13 +18,13 @@ final class DoubleValueObserverSdk extends AbstractDoubleAsynchronousInstrument
InstrumentDescriptor descriptor, InstrumentDescriptor descriptor,
MeterProviderSharedState meterProviderSharedState, MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState, MeterSharedState meterSharedState,
Batcher batcher, InstrumentAccumulator instrumentAccumulator,
@Nullable Callback<DoubleResult> metricUpdater) { @Nullable Callback<DoubleResult> metricUpdater) {
super( super(
descriptor, descriptor,
meterProviderSharedState, meterProviderSharedState,
meterSharedState, meterSharedState,
new ActiveBatcher(batcher), instrumentAccumulator,
metricUpdater); metricUpdater);
} }

View File

@ -18,8 +18,8 @@ final class DoubleValueRecorderSdk extends AbstractSynchronousInstrument<BoundIn
InstrumentDescriptor descriptor, InstrumentDescriptor descriptor,
MeterProviderSharedState meterProviderSharedState, MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState, MeterSharedState meterSharedState,
Batcher batcher) { InstrumentAccumulator instrumentAccumulator) {
super(descriptor, meterProviderSharedState, meterSharedState, new ActiveBatcher(batcher)); super(descriptor, meterProviderSharedState, meterSharedState, instrumentAccumulator);
} }
@Override @Override
@ -35,15 +35,15 @@ final class DoubleValueRecorderSdk extends AbstractSynchronousInstrument<BoundIn
} }
@Override @Override
BoundInstrument newBinding(Batcher batcher) { BoundInstrument newBinding(InstrumentAccumulator instrumentAccumulator) {
return new BoundInstrument(batcher); return new BoundInstrument(instrumentAccumulator);
} }
static final class BoundInstrument extends AbstractBoundInstrument static final class BoundInstrument extends AbstractBoundInstrument
implements BoundDoubleValueRecorder { implements BoundDoubleValueRecorder {
BoundInstrument(Batcher batcher) { BoundInstrument(InstrumentAccumulator instrumentAccumulator) {
super(batcher.getAggregator()); super(instrumentAccumulator.getAggregator());
} }
@Override @Override

View File

@ -11,15 +11,15 @@ import io.opentelemetry.sdk.metrics.data.MetricData;
import java.util.List; import java.util.List;
/** /**
* A {@code Batcher} represents an internal representation of an {code Instrument} aggregation * An {@code InstrumentAccumulator} represents an internal instance of an {@code Accumulator} for a
* process. It records individual measurements (via the {@code Aggregator}). It batches together * specific {code Instrument}. It records individual measurements (via the {@code Aggregator}). It
* {@code Aggregator}s for the similar sets of labels. * batches together {@code Aggregator}s for the similar sets of labels.
* *
* <p>The only thread safe method in this class is {@link #getAggregator()}. An entire collection * <p>The only thread safe method in this class is {@link #getAggregator()}. An entire collection
* cycle must be protected by a lock. A collection cycle is defined by multiple calls to {@link * cycle must be protected by a lock. A collection cycle is defined by multiple calls to {@link
* #batch(Labels, Aggregator, boolean)} followed by one {@link #completeCollectionCycle()}; * #batch(Labels, Aggregator, boolean)} followed by one {@link #completeCollectionCycle()};
*/ */
interface Batcher { interface InstrumentAccumulator {
/** /**
* Returns the {@link Aggregator} that should be used by the bindings, or observers. * Returns the {@link Aggregator} that should be used by the bindings, or observers.

View File

@ -20,11 +20,12 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
/** A collection of available Batchers. */ /** A collection of available InstrumentAccumulators. */
final class Batchers { final class InstrumentAccumulators {
static Batcher getNoop() { static InstrumentAccumulator getNoop() {
return Noop.INSTANCE; return Noop.INSTANCE;
} }
@ -33,7 +34,7 @@ final class Batchers {
* "Cumulative" means that all metrics that are generated will be considered for the lifetime of * "Cumulative" means that all metrics that are generated will be considered for the lifetime of
* the Instrument being aggregated. * the Instrument being aggregated.
*/ */
static Batcher getCumulativeAllLabels( static InstrumentAccumulator getCumulativeAllLabels(
InstrumentDescriptor descriptor, InstrumentDescriptor descriptor,
MeterProviderSharedState meterProviderSharedState, MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState, MeterSharedState meterSharedState,
@ -51,7 +52,7 @@ final class Batchers {
* Create a Batcher that uses the "delta" Temporality and uses all labels for aggregation. "Delta" * Create a Batcher that uses the "delta" Temporality and uses all labels for aggregation. "Delta"
* means that all metrics that are generated are only for the most recent collection interval. * means that all metrics that are generated are only for the most recent collection interval.
*/ */
static Batcher getDeltaAllLabels( static InstrumentAccumulator getDeltaAllLabels(
InstrumentDescriptor descriptor, InstrumentDescriptor descriptor,
MeterProviderSharedState meterProviderSharedState, MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState, MeterSharedState meterSharedState,
@ -65,7 +66,7 @@ final class Batchers {
/* delta= */ true); /* delta= */ true);
} }
private static final class Noop implements Batcher { private static final class Noop implements InstrumentAccumulator {
private static final Noop INSTANCE = new Noop(); private static final Noop INSTANCE = new Noop();
@Override @Override
@ -87,7 +88,7 @@ final class Batchers {
} }
} }
private static final class AllLabels implements Batcher { private static final class AllLabels implements InstrumentAccumulator {
private final InstrumentDescriptor descriptor; private final InstrumentDescriptor descriptor;
private final Aggregation aggregation; private final Aggregation aggregation;
private final Resource resource; private final Resource resource;
@ -123,6 +124,9 @@ final class Batchers {
@Override @Override
public final void batch(Labels labelSet, Aggregator aggregator, boolean unmappedAggregator) { public final void batch(Labels labelSet, Aggregator aggregator, boolean unmappedAggregator) {
if (!aggregator.hasRecordings()) {
return;
}
Aggregator currentAggregator = aggregatorMap.get(labelSet); Aggregator currentAggregator = aggregatorMap.get(labelSet);
if (currentAggregator == null) { if (currentAggregator == null) {
// This aggregator is not mapped, we can use this instance. // This aggregator is not mapped, we can use this instance.
@ -183,35 +187,25 @@ final class Batchers {
if (delta != allLabels.delta) { if (delta != allLabels.delta) {
return false; return false;
} }
if (descriptor != null if (!Objects.equals(descriptor, allLabels.descriptor)) {
? !descriptor.equals(allLabels.descriptor)
: allLabels.descriptor != null) {
return false; return false;
} }
if (aggregation != null if (!Objects.equals(aggregation, allLabels.aggregation)) {
? !aggregation.equals(allLabels.aggregation)
: allLabels.aggregation != null) {
return false; return false;
} }
if (resource != null ? !resource.equals(allLabels.resource) : allLabels.resource != null) { if (!Objects.equals(resource, allLabels.resource)) {
return false; return false;
} }
if (instrumentationLibraryInfo != null if (!Objects.equals(instrumentationLibraryInfo, allLabels.instrumentationLibraryInfo)) {
? !instrumentationLibraryInfo.equals(allLabels.instrumentationLibraryInfo)
: allLabels.instrumentationLibraryInfo != null) {
return false; return false;
} }
if (clock != null ? !clock.equals(allLabels.clock) : allLabels.clock != null) { if (!Objects.equals(clock, allLabels.clock)) {
return false; return false;
} }
if (aggregatorFactory != null if (!Objects.equals(aggregatorFactory, allLabels.aggregatorFactory)) {
? !aggregatorFactory.equals(allLabels.aggregatorFactory)
: allLabels.aggregatorFactory != null) {
return false; return false;
} }
return aggregatorMap != null return Objects.equals(aggregatorMap, allLabels.aggregatorMap);
? aggregatorMap.equals(allLabels.aggregatorMap)
: allLabels.aggregatorMap == null;
} }
@Override @Override
@ -231,5 +225,5 @@ final class Batchers {
} }
} }
private Batchers() {} private InstrumentAccumulators() {}
} }

View File

@ -18,8 +18,8 @@ final class LongCounterSdk extends AbstractSynchronousInstrument<BoundInstrument
InstrumentDescriptor descriptor, InstrumentDescriptor descriptor,
MeterProviderSharedState meterProviderSharedState, MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState, MeterSharedState meterSharedState,
Batcher batcher) { InstrumentAccumulator instrumentAccumulator) {
super(descriptor, meterProviderSharedState, meterSharedState, new ActiveBatcher(batcher)); super(descriptor, meterProviderSharedState, meterSharedState, instrumentAccumulator);
} }
@Override @Override
@ -38,15 +38,15 @@ final class LongCounterSdk extends AbstractSynchronousInstrument<BoundInstrument
} }
@Override @Override
BoundInstrument newBinding(Batcher batcher) { BoundInstrument newBinding(InstrumentAccumulator instrumentAccumulator) {
return new BoundInstrument(batcher); return new BoundInstrument(instrumentAccumulator);
} }
static final class BoundInstrument extends AbstractBoundInstrument static final class BoundInstrument extends AbstractBoundInstrument
implements LongCounter.BoundLongCounter { implements LongCounter.BoundLongCounter {
BoundInstrument(Batcher batcher) { BoundInstrument(InstrumentAccumulator instrumentAccumulator) {
super(batcher.getAggregator()); super(instrumentAccumulator.getAggregator());
} }
@Override @Override

View File

@ -17,13 +17,13 @@ final class LongSumObserverSdk extends AbstractLongAsynchronousInstrument
InstrumentDescriptor descriptor, InstrumentDescriptor descriptor,
MeterProviderSharedState meterProviderSharedState, MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState, MeterSharedState meterSharedState,
Batcher batcher, InstrumentAccumulator instrumentAccumulator,
@Nullable Callback<LongResult> metricUpdater) { @Nullable Callback<LongResult> metricUpdater) {
super( super(
descriptor, descriptor,
meterProviderSharedState, meterProviderSharedState,
meterSharedState, meterSharedState,
new ActiveBatcher(batcher), instrumentAccumulator,
metricUpdater); metricUpdater);
} }

View File

@ -18,8 +18,8 @@ final class LongUpDownCounterSdk extends AbstractSynchronousInstrument<BoundInst
InstrumentDescriptor descriptor, InstrumentDescriptor descriptor,
MeterProviderSharedState meterProviderSharedState, MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState, MeterSharedState meterSharedState,
Batcher batcher) { InstrumentAccumulator instrumentAccumulator) {
super(descriptor, meterProviderSharedState, meterSharedState, new ActiveBatcher(batcher)); super(descriptor, meterProviderSharedState, meterSharedState, instrumentAccumulator);
} }
@Override @Override
@ -35,15 +35,15 @@ final class LongUpDownCounterSdk extends AbstractSynchronousInstrument<BoundInst
} }
@Override @Override
BoundInstrument newBinding(Batcher batcher) { BoundInstrument newBinding(InstrumentAccumulator instrumentAccumulator) {
return new BoundInstrument(batcher); return new BoundInstrument(instrumentAccumulator);
} }
static final class BoundInstrument extends AbstractBoundInstrument static final class BoundInstrument extends AbstractBoundInstrument
implements BoundLongUpDownCounter { implements BoundLongUpDownCounter {
BoundInstrument(Batcher batcher) { BoundInstrument(InstrumentAccumulator instrumentAccumulator) {
super(batcher.getAggregator()); super(instrumentAccumulator.getAggregator());
} }
@Override @Override

View File

@ -17,13 +17,13 @@ final class LongUpDownSumObserverSdk extends AbstractLongAsynchronousInstrument
InstrumentDescriptor descriptor, InstrumentDescriptor descriptor,
MeterProviderSharedState meterProviderSharedState, MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState, MeterSharedState meterSharedState,
Batcher batcher, InstrumentAccumulator instrumentAccumulator,
@Nullable Callback<LongResult> metricUpdater) { @Nullable Callback<LongResult> metricUpdater) {
super( super(
descriptor, descriptor,
meterProviderSharedState, meterProviderSharedState,
meterSharedState, meterSharedState,
new ActiveBatcher(batcher), instrumentAccumulator,
metricUpdater); metricUpdater);
} }

View File

@ -18,13 +18,13 @@ final class LongValueObserverSdk extends AbstractLongAsynchronousInstrument
InstrumentDescriptor descriptor, InstrumentDescriptor descriptor,
MeterProviderSharedState meterProviderSharedState, MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState, MeterSharedState meterSharedState,
Batcher batcher, InstrumentAccumulator instrumentAccumulator,
@Nullable Callback<LongResult> metricUpdater) { @Nullable Callback<LongResult> metricUpdater) {
super( super(
descriptor, descriptor,
meterProviderSharedState, meterProviderSharedState,
meterSharedState, meterSharedState,
new ActiveBatcher(batcher), instrumentAccumulator,
metricUpdater); metricUpdater);
} }

View File

@ -18,8 +18,8 @@ final class LongValueRecorderSdk extends AbstractSynchronousInstrument<BoundInst
InstrumentDescriptor descriptor, InstrumentDescriptor descriptor,
MeterProviderSharedState meterProviderSharedState, MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState, MeterSharedState meterSharedState,
Batcher batcher) { InstrumentAccumulator instrumentAccumulator) {
super(descriptor, meterProviderSharedState, meterSharedState, new ActiveBatcher(batcher)); super(descriptor, meterProviderSharedState, meterSharedState, instrumentAccumulator);
} }
@Override @Override
@ -35,15 +35,15 @@ final class LongValueRecorderSdk extends AbstractSynchronousInstrument<BoundInst
} }
@Override @Override
BoundInstrument newBinding(Batcher batcher) { BoundInstrument newBinding(InstrumentAccumulator instrumentAccumulator) {
return new BoundInstrument(batcher); return new BoundInstrument(instrumentAccumulator);
} }
static final class BoundInstrument extends AbstractBoundInstrument static final class BoundInstrument extends AbstractBoundInstrument
implements BoundLongValueRecorder { implements BoundLongValueRecorder {
BoundInstrument(Batcher batcher) { BoundInstrument(InstrumentAccumulator instrumentAccumulator) {
super(batcher.getAggregator()); super(instrumentAccumulator.getAggregator());
} }
@Override @Override

View File

@ -112,8 +112,11 @@ final class MeterSdk implements Meter {
return result; return result;
} }
/** Creates a {@link Batcher}, by using the {@link ViewRegistry} to do the actual work. */ /**
Batcher createBatcher( * Creates a {@link InstrumentAccumulator}, by using the {@link ViewRegistry} to do the actual
* work.
*/
InstrumentAccumulator createBatcher(
InstrumentDescriptor descriptor, InstrumentDescriptor descriptor,
MeterProviderSharedState meterProviderSharedState, MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState) { MeterSharedState meterSharedState) {

View File

@ -44,11 +44,8 @@ class ViewRegistry {
aggregationChooser.addView(selector, specification); aggregationChooser.addView(selector, specification);
} }
/** /** Create a new {@link InstrumentAccumulator} for use in metric recording aggregation. */
* Create a new {@link io.opentelemetry.sdk.metrics.Batcher} for use in metric recording InstrumentAccumulator createBatcher(
* aggregation.
*/
Batcher createBatcher(
MeterProviderSharedState meterProviderSharedState, MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState, MeterSharedState meterSharedState,
InstrumentDescriptor descriptor) { InstrumentDescriptor descriptor) {
@ -58,10 +55,10 @@ class ViewRegistry {
Aggregation aggregation = specification.aggregation(); Aggregation aggregation = specification.aggregation();
if (Temporality.CUMULATIVE == specification.temporality()) { if (Temporality.CUMULATIVE == specification.temporality()) {
return Batchers.getCumulativeAllLabels( return InstrumentAccumulators.getCumulativeAllLabels(
descriptor, meterProviderSharedState, meterSharedState, aggregation); descriptor, meterProviderSharedState, meterSharedState, aggregation);
} else if (Temporality.DELTA == specification.temporality()) { } else if (Temporality.DELTA == specification.temporality()) {
return Batchers.getDeltaAllLabels( return InstrumentAccumulators.getDeltaAllLabels(
descriptor, meterProviderSharedState, meterSharedState, aggregation); descriptor, meterProviderSharedState, meterSharedState, aggregation);
} }
throw new IllegalStateException("unsupported Temporality: " + specification.temporality()); throw new IllegalStateException("unsupported Temporality: " + specification.temporality());

View File

@ -28,17 +28,20 @@ class AbstractInstrumentTest {
InstrumentationLibraryInfo.create("test_abstract_instrument", ""); InstrumentationLibraryInfo.create("test_abstract_instrument", "");
private static final MeterSharedState METER_SHARED_STATE = private static final MeterSharedState METER_SHARED_STATE =
MeterSharedState.create(INSTRUMENTATION_LIBRARY_INFO); MeterSharedState.create(INSTRUMENTATION_LIBRARY_INFO);
private static final ActiveBatcher ACTIVE_BATCHER = new ActiveBatcher(Batchers.getNoop());
@Test @Test
void getValues() { void getValues() {
TestInstrument testInstrument = TestInstrument testInstrument =
new TestInstrument( new TestInstrument(
INSTRUMENT_DESCRIPTOR, METER_PROVIDER_SHARED_STATE, METER_SHARED_STATE, ACTIVE_BATCHER); INSTRUMENT_DESCRIPTOR,
METER_PROVIDER_SHARED_STATE,
METER_SHARED_STATE,
InstrumentAccumulators.getNoop());
assertThat(testInstrument.getDescriptor()).isSameAs(INSTRUMENT_DESCRIPTOR); assertThat(testInstrument.getDescriptor()).isSameAs(INSTRUMENT_DESCRIPTOR);
assertThat(testInstrument.getMeterProviderSharedState()).isSameAs(METER_PROVIDER_SHARED_STATE); assertThat(testInstrument.getMeterProviderSharedState()).isSameAs(METER_PROVIDER_SHARED_STATE);
assertThat(testInstrument.getMeterSharedState()).isSameAs(METER_SHARED_STATE); assertThat(testInstrument.getMeterSharedState()).isSameAs(METER_SHARED_STATE);
assertThat(testInstrument.getActiveBatcher()).isSameAs(ACTIVE_BATCHER); assertThat(testInstrument.getInstrumentAccumulator())
.isSameAs(InstrumentAccumulators.getNoop());
} }
private static final class TestInstrument extends AbstractInstrument { private static final class TestInstrument extends AbstractInstrument {
@ -46,8 +49,8 @@ class AbstractInstrumentTest {
InstrumentDescriptor descriptor, InstrumentDescriptor descriptor,
MeterProviderSharedState meterProviderSharedState, MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState, MeterSharedState meterSharedState,
ActiveBatcher activeBatcher) { InstrumentAccumulator instrumentAccumulator) {
super(descriptor, meterProviderSharedState, meterSharedState, activeBatcher); super(descriptor, meterProviderSharedState, meterSharedState, instrumentAccumulator);
} }
@Override @Override

View File

@ -29,7 +29,6 @@ class InstrumentRegistryTest {
"name", "other_description", "1", InstrumentType.COUNTER, InstrumentValueType.LONG); "name", "other_description", "1", InstrumentType.COUNTER, InstrumentValueType.LONG);
private static final MeterProviderSharedState METER_PROVIDER_SHARED_STATE = private static final MeterProviderSharedState METER_PROVIDER_SHARED_STATE =
MeterProviderSharedState.create(TestClock.create(), Resource.getEmpty()); MeterProviderSharedState.create(TestClock.create(), Resource.getEmpty());
private static final ActiveBatcher ACTIVE_BATCHER = new ActiveBatcher(Batchers.getNoop());
@Test @Test
void register() { void register() {
@ -37,7 +36,10 @@ class InstrumentRegistryTest {
MeterSharedState.create(InstrumentationLibraryInfo.getEmpty()); MeterSharedState.create(InstrumentationLibraryInfo.getEmpty());
TestInstrument testInstrument = TestInstrument testInstrument =
new TestInstrument( new TestInstrument(
INSTRUMENT_DESCRIPTOR, METER_PROVIDER_SHARED_STATE, meterSharedState, ACTIVE_BATCHER); INSTRUMENT_DESCRIPTOR,
METER_PROVIDER_SHARED_STATE,
meterSharedState,
InstrumentAccumulators.getNoop());
assertThat(meterSharedState.getInstrumentRegistry().register(testInstrument)) assertThat(meterSharedState.getInstrumentRegistry().register(testInstrument))
.isSameAs(testInstrument); .isSameAs(testInstrument);
assertThat(meterSharedState.getInstrumentRegistry().register(testInstrument)) assertThat(meterSharedState.getInstrumentRegistry().register(testInstrument))
@ -50,7 +52,7 @@ class InstrumentRegistryTest {
INSTRUMENT_DESCRIPTOR, INSTRUMENT_DESCRIPTOR,
METER_PROVIDER_SHARED_STATE, METER_PROVIDER_SHARED_STATE,
meterSharedState, meterSharedState,
ACTIVE_BATCHER))) InstrumentAccumulators.getNoop())))
.isSameAs(testInstrument); .isSameAs(testInstrument);
} }
@ -59,7 +61,10 @@ class InstrumentRegistryTest {
MeterSharedState meterSharedState = MeterSharedState.create(getEmpty()); MeterSharedState meterSharedState = MeterSharedState.create(getEmpty());
TestInstrument testInstrument = TestInstrument testInstrument =
new TestInstrument( new TestInstrument(
INSTRUMENT_DESCRIPTOR, METER_PROVIDER_SHARED_STATE, meterSharedState, ACTIVE_BATCHER); INSTRUMENT_DESCRIPTOR,
METER_PROVIDER_SHARED_STATE,
meterSharedState,
InstrumentAccumulators.getNoop());
assertThat(meterSharedState.getInstrumentRegistry().register(testInstrument)) assertThat(meterSharedState.getInstrumentRegistry().register(testInstrument))
.isSameAs(testInstrument); .isSameAs(testInstrument);
@ -73,7 +78,7 @@ class InstrumentRegistryTest {
OTHER_INSTRUMENT_DESCRIPTOR, OTHER_INSTRUMENT_DESCRIPTOR,
METER_PROVIDER_SHARED_STATE, METER_PROVIDER_SHARED_STATE,
meterSharedState, meterSharedState,
ACTIVE_BATCHER)), InstrumentAccumulators.getNoop())),
"Instrument with same name and different descriptor already created."); "Instrument with same name and different descriptor already created.");
} }
@ -82,7 +87,10 @@ class InstrumentRegistryTest {
MeterSharedState meterSharedState = MeterSharedState.create(getEmpty()); MeterSharedState meterSharedState = MeterSharedState.create(getEmpty());
TestInstrument testInstrument = TestInstrument testInstrument =
new TestInstrument( new TestInstrument(
INSTRUMENT_DESCRIPTOR, METER_PROVIDER_SHARED_STATE, meterSharedState, ACTIVE_BATCHER); INSTRUMENT_DESCRIPTOR,
METER_PROVIDER_SHARED_STATE,
meterSharedState,
InstrumentAccumulators.getNoop());
assertThat(meterSharedState.getInstrumentRegistry().register(testInstrument)) assertThat(meterSharedState.getInstrumentRegistry().register(testInstrument))
.isSameAs(testInstrument); .isSameAs(testInstrument);
@ -96,7 +104,7 @@ class InstrumentRegistryTest {
INSTRUMENT_DESCRIPTOR, INSTRUMENT_DESCRIPTOR,
METER_PROVIDER_SHARED_STATE, METER_PROVIDER_SHARED_STATE,
meterSharedState, meterSharedState,
ACTIVE_BATCHER)), InstrumentAccumulators.getNoop())),
"Instrument with same name and different descriptor already created."); "Instrument with same name and different descriptor already created.");
} }
@ -105,8 +113,8 @@ class InstrumentRegistryTest {
InstrumentDescriptor descriptor, InstrumentDescriptor descriptor,
MeterProviderSharedState meterProviderSharedState, MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState, MeterSharedState meterSharedState,
ActiveBatcher activeBatcher) { InstrumentAccumulator instrumentAccumulator) {
super(descriptor, meterProviderSharedState, meterSharedState, activeBatcher); super(descriptor, meterProviderSharedState, meterSharedState, instrumentAccumulator);
} }
@Override @Override
@ -120,8 +128,8 @@ class InstrumentRegistryTest {
InstrumentDescriptor descriptor, InstrumentDescriptor descriptor,
MeterProviderSharedState meterProviderSharedState, MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState, MeterSharedState meterSharedState,
ActiveBatcher activeBatcher) { InstrumentAccumulator instrumentAccumulator) {
super(descriptor, meterProviderSharedState, meterSharedState, activeBatcher); super(descriptor, meterProviderSharedState, meterSharedState, instrumentAccumulator);
} }
@Override @Override

View File

@ -81,7 +81,7 @@ class LongCounterSdkTest {
.setDescription("My very own counter") .setDescription("My very own counter")
.setUnit("ms") .setUnit("ms")
.build(); .build();
BoundInstrument ignored = longCounter.bind(Labels.of("foo", "bar")); BoundInstrument bound = longCounter.bind(Labels.of("foo", "bar"));
List<MetricData> metricDataList = longCounter.collectAll(); List<MetricData> metricDataList = longCounter.collectAll();
assertThat(metricDataList).hasSize(1); assertThat(metricDataList).hasSize(1);
@ -93,6 +93,8 @@ class LongCounterSdkTest {
assertThat(metricData.getResource()).isEqualTo(RESOURCE); assertThat(metricData.getResource()).isEqualTo(RESOURCE);
assertThat(metricData.getInstrumentationLibraryInfo()).isEqualTo(INSTRUMENTATION_LIBRARY_INFO); assertThat(metricData.getInstrumentationLibraryInfo()).isEqualTo(INSTRUMENTATION_LIBRARY_INFO);
assertThat(metricData.getPoints()).isEmpty(); assertThat(metricData.getPoints()).isEmpty();
bound.unbind();
} }
@Test @Test

View File

@ -55,16 +55,17 @@ class ViewRegistryTest {
AggregationConfiguration specification = AggregationConfiguration specification =
AggregationConfiguration.create( AggregationConfiguration.create(
Aggregations.count(), AggregationConfiguration.Temporality.CUMULATIVE); Aggregations.count(), AggregationConfiguration.Temporality.CUMULATIVE);
Batcher expectedBatcher = InstrumentAccumulator expectedInstrumentAccumulator =
Batchers.getCumulativeAllLabels( InstrumentAccumulators.getCumulativeAllLabels(
descriptor, providerSharedState, meterSharedState, Aggregations.count()); descriptor, providerSharedState, meterSharedState, Aggregations.count());
when(chooser.chooseAggregation(descriptor)).thenReturn(specification); when(chooser.chooseAggregation(descriptor)).thenReturn(specification);
Batcher result = viewRegistry.createBatcher(providerSharedState, meterSharedState, descriptor); InstrumentAccumulator result =
viewRegistry.createBatcher(providerSharedState, meterSharedState, descriptor);
assertThat(result.generatesDeltas()).isFalse(); assertThat(result.generatesDeltas()).isFalse();
assertThat(result).isEqualTo(expectedBatcher); assertThat(result).isEqualTo(expectedInstrumentAccumulator);
assertThat(result).isNotNull(); assertThat(result).isNotNull();
} }
@ -86,16 +87,17 @@ class ViewRegistryTest {
AggregationConfiguration specification = AggregationConfiguration specification =
AggregationConfiguration.create( AggregationConfiguration.create(
Aggregations.count(), AggregationConfiguration.Temporality.DELTA); Aggregations.count(), AggregationConfiguration.Temporality.DELTA);
Batcher expectedBatcher = InstrumentAccumulator expectedInstrumentAccumulator =
Batchers.getDeltaAllLabels( InstrumentAccumulators.getDeltaAllLabels(
descriptor, providerSharedState, meterSharedState, Aggregations.count()); descriptor, providerSharedState, meterSharedState, Aggregations.count());
when(chooser.chooseAggregation(descriptor)).thenReturn(specification); when(chooser.chooseAggregation(descriptor)).thenReturn(specification);
Batcher result = viewRegistry.createBatcher(providerSharedState, meterSharedState, descriptor); InstrumentAccumulator result =
viewRegistry.createBatcher(providerSharedState, meterSharedState, descriptor);
assertThat(result.generatesDeltas()).isTrue(); assertThat(result.generatesDeltas()).isTrue();
assertThat(result).isEqualTo(expectedBatcher); assertThat(result).isEqualTo(expectedInstrumentAccumulator);
assertThat(result).isNotNull(); assertThat(result).isNotNull();
} }