Remove AsyncInstrumentRegistry after update to SDK 1.12 (#5525)

* Remove AsyncInstrumentRegistry after update to SDK 1.12

* added comments
This commit is contained in:
Mateusz Rzeszutek 2022-03-10 05:39:23 +01:00 committed by GitHub
parent e9c1efece2
commit 0ad07a10c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 230 additions and 451 deletions

View File

@ -1,303 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.api.internal;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.instrumentation.api.cache.Cache;
import java.lang.ref.WeakReference;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.ToDoubleFunction;
import java.util.function.ToLongFunction;
import javax.annotation.Nullable;
// this class must live in the bootstrap classloader - in case different metrics instrumentations
// (micrometer) get applied to classes in different classloaders, we want them to share the same
// async instrument registry - because the underlying OTel Meter is shared too. There is only one
// OTel SDK in the agent, and therefore there must be only one AsyncInstrumentRegistry too -
// otherwise some async metrics would get lost because of duplicate instrument registrations.
/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
// TODO: refactor this class, there's too much copy-paste here
public final class AsyncInstrumentRegistry {
// we need to re-use instrument registries per OpenTelemetry instance so that async instruments
// that were created by other OpenTelemetryMeterRegistries can be reused; otherwise the SDK will
// start logging errors and async measurements will not be recorded
private static final Cache<Meter, AsyncInstrumentRegistry> asyncInstrumentRegistries =
Cache.weak();
/**
* Returns the {@link AsyncInstrumentRegistry} for the passed {@link Meter}. There is at most one
* {@link AsyncInstrumentRegistry} created for each OpenTelemetry {@link Meter}.
*/
public static AsyncInstrumentRegistry getOrCreate(Meter meter) {
return asyncInstrumentRegistries.computeIfAbsent(meter, AsyncInstrumentRegistry::new);
}
// using a weak ref so that the AsyncInstrumentRegistry (which is stored in a static maps) does
// not hold strong references to Meter (and thus make it impossible to collect Meter garbage).
// in practice this should never return null - OpenTelemetryMeterRegistry maintains a strong
// reference to both Meter and AsyncInstrumentRegistry; if the meter registry is GC'd then its
// corresponding AsyncInstrumentRegistry cannot possibly be used; and Meter cannot be GC'd until
// OpenTelemetryMeterRegistry is GC'd
private final WeakReference<Meter> meter;
// values from the maps below are never removed - that is because the underlying OpenTelemetry
// async instruments are never removed; if we removed the recorder and tried to register it once
// again OTel would log an error and basically ignore the new callback
// these maps are GC'd together with this AsyncInstrumentRegistry instance - that is, when the
// whole OpenTelemetry Meter gets GC'd
private final Map<String, DoubleMeasurementsRecorder> gauges = new ConcurrentHashMap<>();
private final Map<String, DoubleMeasurementsRecorder> doubleCounters = new ConcurrentHashMap<>();
private final Map<String, LongMeasurementsRecorder> longCounters = new ConcurrentHashMap<>();
private final Map<String, DoubleMeasurementsRecorder> upDownDoubleCounters =
new ConcurrentHashMap<>();
private final Map<String, LongMeasurementsRecorder> upDownLongCounters =
new ConcurrentHashMap<>();
AsyncInstrumentRegistry(Meter meter) {
this.meter = new WeakReference<>(meter);
}
public <T> AsyncMeasurementHandle buildGauge(
String name,
String description,
String baseUnit,
Attributes attributes,
@Nullable T obj,
ToDoubleFunction<T> objMetric) {
DoubleMeasurementsRecorder recorder =
gauges.computeIfAbsent(
name,
n -> {
DoubleMeasurementsRecorder recorderCallback = new DoubleMeasurementsRecorder();
otelMeter()
.gaugeBuilder(name)
.setDescription(description)
.setUnit(baseUnit)
.buildWithCallback(recorderCallback);
return recorderCallback;
});
recorder.addMeasurement(attributes, new DoubleMeasurementSource<>(obj, objMetric));
return new AsyncMeasurementHandle(recorder, attributes);
}
public <T> AsyncMeasurementHandle buildDoubleCounter(
String name,
String description,
String baseUnit,
Attributes attributes,
@Nullable T obj,
ToDoubleFunction<T> objMetric) {
DoubleMeasurementsRecorder recorder =
doubleCounters.computeIfAbsent(
name,
n -> {
DoubleMeasurementsRecorder recorderCallback = new DoubleMeasurementsRecorder();
otelMeter()
.counterBuilder(name)
.setDescription(description)
.setUnit(baseUnit)
.ofDoubles()
.buildWithCallback(recorderCallback);
return recorderCallback;
});
recorder.addMeasurement(attributes, new DoubleMeasurementSource<>(obj, objMetric));
return new AsyncMeasurementHandle(recorder, attributes);
}
public <T> AsyncMeasurementHandle buildLongCounter(
String name,
String description,
String baseUnit,
Attributes attributes,
@Nullable T obj,
ToLongFunction<T> objMetric) {
LongMeasurementsRecorder recorder =
longCounters.computeIfAbsent(
name,
n -> {
LongMeasurementsRecorder recorderCallback = new LongMeasurementsRecorder();
otelMeter()
.counterBuilder(name)
.setDescription(description)
.setUnit(baseUnit)
.buildWithCallback(recorderCallback);
return recorderCallback;
});
recorder.addMeasurement(attributes, new LongMeasurementSource<>(obj, objMetric));
return new AsyncMeasurementHandle(recorder, attributes);
}
public <T> AsyncMeasurementHandle buildUpDownDoubleCounter(
String name,
String description,
String baseUnit,
Attributes attributes,
@Nullable T obj,
ToDoubleFunction<T> objMetric) {
DoubleMeasurementsRecorder recorder =
upDownDoubleCounters.computeIfAbsent(
name,
n -> {
DoubleMeasurementsRecorder recorderCallback = new DoubleMeasurementsRecorder();
otelMeter()
.upDownCounterBuilder(name)
.setDescription(description)
.setUnit(baseUnit)
.ofDoubles()
.buildWithCallback(recorderCallback);
return recorderCallback;
});
recorder.addMeasurement(attributes, new DoubleMeasurementSource<>(obj, objMetric));
return new AsyncMeasurementHandle(recorder, attributes);
}
public <T> AsyncMeasurementHandle buildUpDownLongCounter(
String name,
String description,
String baseUnit,
Attributes attributes,
@Nullable T obj,
ToLongFunction<T> objMetric) {
LongMeasurementsRecorder recorder =
upDownLongCounters.computeIfAbsent(
name,
n -> {
LongMeasurementsRecorder recorderCallback = new LongMeasurementsRecorder();
otelMeter()
.upDownCounterBuilder(name)
.setDescription(description)
.setUnit(baseUnit)
.buildWithCallback(recorderCallback);
return recorderCallback;
});
recorder.addMeasurement(attributes, new LongMeasurementSource<>(obj, objMetric));
return new AsyncMeasurementHandle(recorder, attributes);
}
private Meter otelMeter() {
Meter otelMeter = meter.get();
if (otelMeter == null) {
throw new IllegalStateException(
"OpenTelemetry Meter was garbage-collected, but the async instrument registry was not");
}
return otelMeter;
}
private abstract static class MeasurementsRecorder<I> {
final Map<Attributes, I> measurements = new ConcurrentHashMap<>();
void addMeasurement(Attributes attributes, I info) {
measurements.put(attributes, info);
}
void removeMeasurement(Attributes attributes) {
measurements.remove(attributes);
}
}
private static final class DoubleMeasurementsRecorder
extends MeasurementsRecorder<DoubleMeasurementSource<?>>
implements Consumer<ObservableDoubleMeasurement> {
@Override
public void accept(ObservableDoubleMeasurement measurement) {
measurements.forEach((attributes, gauge) -> record(measurement, attributes, gauge));
}
private static <T> void record(
ObservableDoubleMeasurement measurement,
Attributes attributes,
DoubleMeasurementSource<T> gauge) {
T obj = gauge.objWeakRef.get();
if (obj != null) {
measurement.record(gauge.metricFunction.applyAsDouble(obj), attributes);
}
}
}
private static final class LongMeasurementsRecorder
extends MeasurementsRecorder<LongMeasurementSource<?>>
implements Consumer<ObservableLongMeasurement> {
@Override
public void accept(ObservableLongMeasurement measurement) {
measurements.forEach((attributes, gauge) -> record(measurement, attributes, gauge));
}
private static <T> void record(
ObservableLongMeasurement measurement,
Attributes attributes,
LongMeasurementSource<T> gauge) {
T obj = gauge.objWeakRef.get();
if (obj != null) {
measurement.record(gauge.metricFunction.applyAsLong(obj), attributes);
}
}
}
private static final class DoubleMeasurementSource<T> {
private final WeakReference<T> objWeakRef;
private final ToDoubleFunction<T> metricFunction;
private DoubleMeasurementSource(@Nullable T obj, ToDoubleFunction<T> metricFunction) {
this.objWeakRef = new WeakReference<>(obj);
this.metricFunction = metricFunction;
}
}
private static final class LongMeasurementSource<T> {
private final WeakReference<T> objWeakRef;
private final ToLongFunction<T> metricFunction;
private LongMeasurementSource(@Nullable T obj, ToLongFunction<T> metricFunction) {
this.objWeakRef = new WeakReference<>(obj);
this.metricFunction = metricFunction;
}
}
/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public static final class AsyncMeasurementHandle {
private final MeasurementsRecorder<?> measurementsRecorder;
private final Attributes attributes;
AsyncMeasurementHandle(MeasurementsRecorder<?> measurementsRecorder, Attributes attributes) {
this.measurementsRecorder = measurementsRecorder;
this.attributes = attributes;
}
public void remove() {
measurementsRecorder.removeMeasurement(attributes);
}
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.micrometer.v1_5;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
import java.lang.ref.WeakReference;
import java.util.function.Consumer;
import java.util.function.ToDoubleFunction;
import javax.annotation.Nullable;
final class DoubleMeasurementRecorder<T> implements Consumer<ObservableDoubleMeasurement> {
// using a weak reference here so that the existence of the micrometer Meter does not block the
// measured object from being GC'd; e.g. a Gauge (or any other async instrument) must not block
// garbage collection of the object that it measures
private final WeakReference<T> objWeakRef;
private final ToDoubleFunction<T> metricFunction;
private final Attributes attributes;
DoubleMeasurementRecorder(
@Nullable T obj, ToDoubleFunction<T> metricFunction, Attributes attributes) {
this.objWeakRef = new WeakReference<>(obj);
this.metricFunction = metricFunction;
this.attributes = attributes;
}
@Override
public void accept(ObservableDoubleMeasurement measurement) {
T obj = objWeakRef.get();
if (obj != null) {
measurement.record(metricFunction.applyAsDouble(obj), attributes);
}
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.micrometer.v1_5;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import java.lang.ref.WeakReference;
import java.util.function.Consumer;
import java.util.function.ToLongFunction;
import javax.annotation.Nullable;
final class LongMeasurementRecorder<T> implements Consumer<ObservableLongMeasurement> {
// using a weak reference here so that the existence of the micrometer Meter does not block the
// measured object from being GC'd; e.g. a Gauge (or any other async instrument) must not block
// garbage collection of the object that it measures
private final WeakReference<T> objWeakRef;
private final ToLongFunction<T> metricFunction;
private final Attributes attributes;
LongMeasurementRecorder(
@Nullable T obj, ToLongFunction<T> metricFunction, Attributes attributes) {
this.objWeakRef = new WeakReference<>(obj);
this.metricFunction = metricFunction;
this.attributes = attributes;
}
@Override
public void accept(ObservableLongMeasurement measurement) {
T obj = objWeakRef.get();
if (obj != null) {
measurement.record(metricFunction.applyAsLong(obj), attributes);
}
}
}

View File

@ -21,8 +21,7 @@ import io.micrometer.core.instrument.distribution.TimeWindowMax;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.instrumentation.api.internal.AsyncInstrumentRegistry;
import io.opentelemetry.instrumentation.api.internal.AsyncInstrumentRegistry.AsyncMeasurementHandle;
import io.opentelemetry.api.metrics.ObservableDoubleGauge;
import java.util.Collections;
import java.util.concurrent.atomic.DoubleAdder;
import java.util.concurrent.atomic.LongAdder;
@ -35,7 +34,7 @@ final class OpenTelemetryDistributionSummary extends AbstractDistributionSummary
// TODO: use bound instruments when they're available
private final DoubleHistogram otelHistogram;
private final Attributes attributes;
private final AsyncMeasurementHandle maxHandle;
private final ObservableDoubleGauge observableMax;
private volatile boolean removed = false;
@ -45,8 +44,7 @@ final class OpenTelemetryDistributionSummary extends AbstractDistributionSummary
Clock clock,
DistributionStatisticConfig distributionStatisticConfig,
double scale,
Meter otelMeter,
AsyncInstrumentRegistry asyncInstrumentRegistry) {
Meter otelMeter) {
super(id, clock, distributionStatisticConfig, scale, false);
if (isUsingMicrometerHistograms()) {
@ -58,21 +56,20 @@ final class OpenTelemetryDistributionSummary extends AbstractDistributionSummary
this.attributes = tagsAsAttributes(id, namingConvention);
String conventionName = name(id, namingConvention);
String name = name(id, namingConvention);
this.otelHistogram =
otelMeter
.histogramBuilder(conventionName)
.setDescription(description(conventionName, id))
.histogramBuilder(name)
.setDescription(description(name, id))
.setUnit(baseUnit(id))
.build();
this.maxHandle =
asyncInstrumentRegistry.buildGauge(
conventionName + ".max",
description(conventionName, id),
baseUnit(id),
attributes,
max,
TimeWindowMax::poll);
this.observableMax =
otelMeter
.gaugeBuilder(name + ".max")
.setDescription(description(name, id))
.setUnit(baseUnit(id))
.buildWithCallback(
new DoubleMeasurementRecorder<>(max, TimeWindowMax::poll, attributes));
}
boolean isUsingMicrometerHistograms() {
@ -112,7 +109,7 @@ final class OpenTelemetryDistributionSummary extends AbstractDistributionSummary
@Override
public void onRemove() {
removed = true;
maxHandle.remove();
observableMax.close();
}
private interface Measurements {

View File

@ -14,8 +14,8 @@ import io.micrometer.core.instrument.FunctionCounter;
import io.micrometer.core.instrument.Measurement;
import io.micrometer.core.instrument.config.NamingConvention;
import io.micrometer.core.instrument.util.MeterEquivalence;
import io.opentelemetry.instrumentation.api.internal.AsyncInstrumentRegistry;
import io.opentelemetry.instrumentation.api.internal.AsyncInstrumentRegistry.AsyncMeasurementHandle;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableDoubleCounter;
import java.util.Collections;
import java.util.function.ToDoubleFunction;
import javax.annotation.Nullable;
@ -24,25 +24,26 @@ import javax.annotation.Nullable;
final class OpenTelemetryFunctionCounter<T> implements FunctionCounter, RemovableMeter {
private final Id id;
private final AsyncMeasurementHandle countMeasurementHandle;
private final ObservableDoubleCounter observableCount;
OpenTelemetryFunctionCounter(
Id id,
NamingConvention namingConvention,
T obj,
ToDoubleFunction<T> countFunction,
AsyncInstrumentRegistry asyncInstrumentRegistry) {
Meter otelMeter) {
this.id = id;
String conventionName = name(id, namingConvention);
countMeasurementHandle =
asyncInstrumentRegistry.buildDoubleCounter(
conventionName,
description(conventionName, id),
baseUnit(id),
tagsAsAttributes(id, namingConvention),
obj,
countFunction);
String name = name(id, namingConvention);
observableCount =
otelMeter
.counterBuilder(name)
.ofDoubles()
.setDescription(description(name, id))
.setUnit(baseUnit(id))
.buildWithCallback(
new DoubleMeasurementRecorder<>(
obj, countFunction, tagsAsAttributes(id, namingConvention)));
}
@Override
@ -64,7 +65,7 @@ final class OpenTelemetryFunctionCounter<T> implements FunctionCounter, Removabl
@Override
public void onRemove() {
countMeasurementHandle.remove();
observableCount.close();
}
@SuppressWarnings("EqualsWhichDoesntCheckParameterClass")

View File

@ -16,8 +16,9 @@ import io.micrometer.core.instrument.config.NamingConvention;
import io.micrometer.core.instrument.util.MeterEquivalence;
import io.micrometer.core.instrument.util.TimeUtils;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.instrumentation.api.internal.AsyncInstrumentRegistry;
import io.opentelemetry.instrumentation.api.internal.AsyncInstrumentRegistry.AsyncMeasurementHandle;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableDoubleCounter;
import io.opentelemetry.api.metrics.ObservableLongCounter;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.function.ToDoubleFunction;
@ -29,8 +30,8 @@ final class OpenTelemetryFunctionTimer<T> implements FunctionTimer, RemovableMet
private final Id id;
private final TimeUnit baseTimeUnit;
private final AsyncMeasurementHandle countMeasurementHandle;
private final AsyncMeasurementHandle totalTimeMeasurementHandle;
private final ObservableLongCounter observableCount;
private final ObservableDoubleCounter observableTotalTime;
OpenTelemetryFunctionTimer(
Id id,
@ -40,33 +41,36 @@ final class OpenTelemetryFunctionTimer<T> implements FunctionTimer, RemovableMet
ToDoubleFunction<T> totalTimeFunction,
TimeUnit totalTimeFunctionUnit,
TimeUnit baseTimeUnit,
AsyncInstrumentRegistry asyncInstrumentRegistry) {
Meter otelMeter) {
this.id = id;
this.baseTimeUnit = baseTimeUnit;
String conventionName = name(id, namingConvention);
String name = name(id, namingConvention);
Attributes attributes = tagsAsAttributes(id, namingConvention);
countMeasurementHandle =
asyncInstrumentRegistry.buildLongCounter(
conventionName + ".count",
description(conventionName, id),
/* baseUnit = */ "1",
attributes,
obj,
countFunction);
this.observableCount =
otelMeter
.counterBuilder(name + ".count")
.setDescription(description(name, id))
.setUnit("1")
.buildWithCallback(new LongMeasurementRecorder<>(obj, countFunction, attributes));
totalTimeMeasurementHandle =
asyncInstrumentRegistry.buildDoubleCounter(
conventionName + ".sum",
description(conventionName, id),
getUnitString(baseTimeUnit),
attributes,
obj,
val ->
TimeUtils.convert(
totalTimeFunction.applyAsDouble(val), totalTimeFunctionUnit, baseTimeUnit));
this.observableTotalTime =
otelMeter
.counterBuilder(name + ".sum")
.ofDoubles()
.setDescription(description(name, id))
.setUnit(getUnitString(baseTimeUnit))
.buildWithCallback(
new DoubleMeasurementRecorder<>(
obj,
val ->
TimeUtils.convert(
totalTimeFunction.applyAsDouble(val),
totalTimeFunctionUnit,
baseTimeUnit),
attributes));
}
@Override
@ -105,8 +109,8 @@ final class OpenTelemetryFunctionTimer<T> implements FunctionTimer, RemovableMet
@Override
public void onRemove() {
countMeasurementHandle.remove();
totalTimeMeasurementHandle.remove();
observableCount.close();
observableTotalTime.close();
}
@SuppressWarnings("EqualsWhichDoesntCheckParameterClass")

View File

@ -14,8 +14,8 @@ import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Measurement;
import io.micrometer.core.instrument.config.NamingConvention;
import io.micrometer.core.instrument.util.MeterEquivalence;
import io.opentelemetry.instrumentation.api.internal.AsyncInstrumentRegistry;
import io.opentelemetry.instrumentation.api.internal.AsyncInstrumentRegistry.AsyncMeasurementHandle;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableDoubleGauge;
import java.util.Collections;
import java.util.function.ToDoubleFunction;
import javax.annotation.Nullable;
@ -24,26 +24,26 @@ import javax.annotation.Nullable;
final class OpenTelemetryGauge<T> implements Gauge, RemovableMeter {
private final Id id;
private final AsyncMeasurementHandle gaugeMeasurementHandle;
private final ObservableDoubleGauge observableGauge;
OpenTelemetryGauge(
Id id,
NamingConvention namingConvention,
@Nullable T obj,
ToDoubleFunction<T> objMetric,
AsyncInstrumentRegistry asyncInstrumentRegistry) {
Meter otelMeter) {
this.id = id;
String conventionName = name(id, namingConvention);
gaugeMeasurementHandle =
asyncInstrumentRegistry.buildGauge(
conventionName,
description(conventionName, id),
baseUnit(id),
tagsAsAttributes(id, namingConvention),
obj,
objMetric);
String name = name(id, namingConvention);
observableGauge =
otelMeter
.gaugeBuilder(name)
.setDescription(description(name, id))
.setUnit(baseUnit(id))
.buildWithCallback(
new DoubleMeasurementRecorder<>(
obj, objMetric, tagsAsAttributes(id, namingConvention)));
}
@Override
@ -65,7 +65,7 @@ final class OpenTelemetryGauge<T> implements Gauge, RemovableMeter {
@Override
public void onRemove() {
gaugeMeasurementHandle.remove();
observableGauge.close();
}
@SuppressWarnings("EqualsWhichDoesntCheckParameterClass")

View File

@ -16,16 +16,17 @@ import io.micrometer.core.instrument.config.NamingConvention;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.internal.DefaultLongTaskTimer;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.instrumentation.api.internal.AsyncInstrumentRegistry;
import io.opentelemetry.instrumentation.api.internal.AsyncInstrumentRegistry.AsyncMeasurementHandle;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableDoubleUpDownCounter;
import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
final class OpenTelemetryLongTaskTimer extends DefaultLongTaskTimer implements RemovableMeter {
private final DistributionStatisticConfig distributionStatisticConfig;
private final AsyncMeasurementHandle activeTasksHandle;
private final AsyncMeasurementHandle durationHandle;
private final ObservableLongUpDownCounter observableActiveTasks;
private final ObservableDoubleUpDownCounter observableDuration;
OpenTelemetryLongTaskTimer(
Id id,
@ -33,29 +34,30 @@ final class OpenTelemetryLongTaskTimer extends DefaultLongTaskTimer implements R
Clock clock,
TimeUnit baseTimeUnit,
DistributionStatisticConfig distributionStatisticConfig,
AsyncInstrumentRegistry asyncInstrumentRegistry) {
Meter otelMeter) {
super(id, clock, baseTimeUnit, distributionStatisticConfig, false);
this.distributionStatisticConfig = distributionStatisticConfig;
String conventionName = name(id, namingConvention);
String name = name(id, namingConvention);
Attributes attributes = tagsAsAttributes(id, namingConvention);
this.activeTasksHandle =
asyncInstrumentRegistry.buildUpDownLongCounter(
conventionName + ".active",
description(conventionName, id),
"tasks",
attributes,
this,
DefaultLongTaskTimer::activeTasks);
this.durationHandle =
asyncInstrumentRegistry.buildUpDownDoubleCounter(
conventionName + ".duration",
description(conventionName, id),
getUnitString(baseTimeUnit),
attributes,
this,
t -> t.duration(baseTimeUnit));
this.observableActiveTasks =
otelMeter
.upDownCounterBuilder(name + ".active")
.setDescription(description(name, id))
.setUnit("tasks")
.buildWithCallback(
new LongMeasurementRecorder<>(this, DefaultLongTaskTimer::activeTasks, attributes));
this.observableDuration =
otelMeter
.upDownCounterBuilder(name + ".duration")
.ofDoubles()
.setDescription(description(name, id))
.setUnit(getUnitString(baseTimeUnit))
.buildWithCallback(
new DoubleMeasurementRecorder<>(
this, t -> t.duration(t.baseTimeUnit()), attributes));
}
@Override
@ -66,8 +68,8 @@ final class OpenTelemetryLongTaskTimer extends DefaultLongTaskTimer implements R
@Override
public void onRemove() {
activeTasksHandle.remove();
durationHandle.remove();
observableActiveTasks.close();
observableDuration.close();
}
boolean isUsingMicrometerHistograms() {

View File

@ -15,8 +15,6 @@ import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.config.NamingConvention;
import io.micrometer.core.instrument.util.MeterEquivalence;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.instrumentation.api.internal.AsyncInstrumentRegistry;
import io.opentelemetry.instrumentation.api.internal.AsyncInstrumentRegistry.AsyncMeasurementHandle;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -26,36 +24,46 @@ import javax.annotation.Nullable;
final class OpenTelemetryMeter implements Meter, RemovableMeter {
private final Id id;
private final List<AsyncMeasurementHandle> measurementHandles;
private final List<AutoCloseable> observableInstruments;
OpenTelemetryMeter(
Id id,
NamingConvention namingConvention,
Iterable<Measurement> measurements,
AsyncInstrumentRegistry asyncInstrumentRegistry) {
io.opentelemetry.api.metrics.Meter otelMeter) {
this.id = id;
Attributes attributes = tagsAsAttributes(id, namingConvention);
List<AsyncMeasurementHandle> measurementHandles = new ArrayList<>();
List<AutoCloseable> observableInstruments = new ArrayList<>();
for (Measurement measurement : measurements) {
String name = statisticInstrumentName(id, measurement.getStatistic(), namingConvention);
String description = description(name, id);
String baseUnit = baseUnit(id);
DoubleMeasurementRecorder<Measurement> callback =
new DoubleMeasurementRecorder<>(measurement, Measurement::getValue, attributes);
switch (measurement.getStatistic()) {
case TOTAL:
// fall through
case TOTAL_TIME:
case COUNT:
measurementHandles.add(
asyncInstrumentRegistry.buildDoubleCounter(
name, description, baseUnit, attributes, measurement, Measurement::getValue));
observableInstruments.add(
otelMeter
.counterBuilder(name)
.ofDoubles()
.setDescription(description)
.setUnit(baseUnit)
.buildWithCallback(callback));
break;
case ACTIVE_TASKS:
measurementHandles.add(
asyncInstrumentRegistry.buildUpDownDoubleCounter(
name, description, baseUnit, attributes, measurement, Measurement::getValue));
observableInstruments.add(
otelMeter
.upDownCounterBuilder(name)
.ofDoubles()
.setDescription(description)
.setUnit(baseUnit)
.buildWithCallback(callback));
break;
case DURATION:
@ -63,13 +71,16 @@ final class OpenTelemetryMeter implements Meter, RemovableMeter {
case MAX:
case VALUE:
case UNKNOWN:
measurementHandles.add(
asyncInstrumentRegistry.buildGauge(
name, description, baseUnit, attributes, measurement, Measurement::getValue));
observableInstruments.add(
otelMeter
.gaugeBuilder(name)
.setDescription(description)
.setUnit(baseUnit)
.buildWithCallback(callback));
break;
}
}
this.measurementHandles = measurementHandles;
this.observableInstruments = observableInstruments;
}
@Override
@ -85,7 +96,13 @@ final class OpenTelemetryMeter implements Meter, RemovableMeter {
@Override
public void onRemove() {
measurementHandles.forEach(AsyncMeasurementHandle::remove);
try {
for (AutoCloseable observableInstrument : observableInstruments) {
observableInstrument.close();
}
} catch (Exception e) {
throw new IllegalStateException("SDK instruments should never throw on close()", e);
}
}
@SuppressWarnings("EqualsWhichDoesntCheckParameterClass")

View File

@ -21,7 +21,6 @@ import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.distribution.HistogramGauges;
import io.micrometer.core.instrument.distribution.pause.PauseDetector;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.internal.AsyncInstrumentRegistry;
import java.util.concurrent.TimeUnit;
import java.util.function.ToDoubleFunction;
import java.util.function.ToLongFunction;
@ -52,14 +51,12 @@ public final class OpenTelemetryMeterRegistry extends MeterRegistry {
private final TimeUnit baseTimeUnit;
private final io.opentelemetry.api.metrics.Meter otelMeter;
private final AsyncInstrumentRegistry asyncInstrumentRegistry;
OpenTelemetryMeterRegistry(
Clock clock, TimeUnit baseTimeUnit, io.opentelemetry.api.metrics.Meter otelMeter) {
super(clock);
this.baseTimeUnit = baseTimeUnit;
this.otelMeter = otelMeter;
this.asyncInstrumentRegistry = AsyncInstrumentRegistry.getOrCreate(otelMeter);
this.config()
.namingConvention(NamingConvention.identity)
@ -68,8 +65,7 @@ public final class OpenTelemetryMeterRegistry extends MeterRegistry {
@Override
protected <T> Gauge newGauge(Meter.Id id, @Nullable T obj, ToDoubleFunction<T> valueFunction) {
return new OpenTelemetryGauge<>(
id, config().namingConvention(), obj, valueFunction, asyncInstrumentRegistry);
return new OpenTelemetryGauge<>(id, config().namingConvention(), obj, valueFunction, otelMeter);
}
@Override
@ -87,7 +83,7 @@ public final class OpenTelemetryMeterRegistry extends MeterRegistry {
clock,
getBaseTimeUnit(),
distributionStatisticConfig,
asyncInstrumentRegistry);
otelMeter);
if (timer.isUsingMicrometerHistograms()) {
HistogramGauges.registerWithCommonFormat(timer, this);
}
@ -107,8 +103,7 @@ public final class OpenTelemetryMeterRegistry extends MeterRegistry {
distributionStatisticConfig,
pauseDetector,
getBaseTimeUnit(),
otelMeter,
asyncInstrumentRegistry);
otelMeter);
if (timer.isUsingMicrometerHistograms()) {
HistogramGauges.registerWithCommonFormat(timer, this);
}
@ -120,13 +115,7 @@ public final class OpenTelemetryMeterRegistry extends MeterRegistry {
Meter.Id id, DistributionStatisticConfig distributionStatisticConfig, double scale) {
OpenTelemetryDistributionSummary distributionSummary =
new OpenTelemetryDistributionSummary(
id,
config().namingConvention(),
clock,
distributionStatisticConfig,
scale,
otelMeter,
asyncInstrumentRegistry);
id, config().namingConvention(), clock, distributionStatisticConfig, scale, otelMeter);
if (distributionSummary.isUsingMicrometerHistograms()) {
HistogramGauges.registerWithCommonFormat(distributionSummary, this);
}
@ -135,8 +124,7 @@ public final class OpenTelemetryMeterRegistry extends MeterRegistry {
@Override
protected Meter newMeter(Meter.Id id, Meter.Type type, Iterable<Measurement> measurements) {
return new OpenTelemetryMeter(
id, config().namingConvention(), measurements, asyncInstrumentRegistry);
return new OpenTelemetryMeter(id, config().namingConvention(), measurements, otelMeter);
}
@Override
@ -154,14 +142,14 @@ public final class OpenTelemetryMeterRegistry extends MeterRegistry {
totalTimeFunction,
totalTimeFunctionUnit,
getBaseTimeUnit(),
asyncInstrumentRegistry);
otelMeter);
}
@Override
protected <T> FunctionCounter newFunctionCounter(
Meter.Id id, T obj, ToDoubleFunction<T> countFunction) {
return new OpenTelemetryFunctionCounter<>(
id, config().namingConvention(), obj, countFunction, asyncInstrumentRegistry);
id, config().namingConvention(), obj, countFunction, otelMeter);
}
@Override

View File

@ -22,8 +22,7 @@ import io.micrometer.core.instrument.util.TimeUtils;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.instrumentation.api.internal.AsyncInstrumentRegistry;
import io.opentelemetry.instrumentation.api.internal.AsyncInstrumentRegistry.AsyncMeasurementHandle;
import io.opentelemetry.api.metrics.ObservableDoubleGauge;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
@ -36,7 +35,7 @@ final class OpenTelemetryTimer extends AbstractTimer implements RemovableMeter {
// TODO: use bound instruments when they're available
private final DoubleHistogram otelHistogram;
private final Attributes attributes;
private final AsyncMeasurementHandle maxHandle;
private final ObservableDoubleGauge observableMax;
private volatile boolean removed = false;
@ -47,8 +46,7 @@ final class OpenTelemetryTimer extends AbstractTimer implements RemovableMeter {
DistributionStatisticConfig distributionStatisticConfig,
PauseDetector pauseDetector,
TimeUnit baseTimeUnit,
Meter otelMeter,
AsyncInstrumentRegistry asyncInstrumentRegistry) {
Meter otelMeter) {
super(id, clock, distributionStatisticConfig, pauseDetector, TimeUnit.MILLISECONDS, false);
if (isUsingMicrometerHistograms()) {
@ -61,21 +59,20 @@ final class OpenTelemetryTimer extends AbstractTimer implements RemovableMeter {
this.baseTimeUnit = baseTimeUnit;
this.attributes = tagsAsAttributes(id, namingConvention);
String conventionName = name(id, namingConvention);
String name = name(id, namingConvention);
this.otelHistogram =
otelMeter
.histogramBuilder(conventionName)
.setDescription(description(conventionName, id))
.histogramBuilder(name)
.setDescription(description(name, id))
.setUnit(getUnitString(baseTimeUnit))
.build();
this.maxHandle =
asyncInstrumentRegistry.buildGauge(
conventionName + ".max",
description(conventionName, id),
getUnitString(baseTimeUnit),
attributes,
max,
m -> m.poll(baseTimeUnit));
this.observableMax =
otelMeter
.gaugeBuilder(name + ".max")
.setDescription(description(name, id))
.setUnit(getUnitString(baseTimeUnit))
.buildWithCallback(
new DoubleMeasurementRecorder<>(max, m -> m.poll(baseTimeUnit), attributes));
}
boolean isUsingMicrometerHistograms() {
@ -117,7 +114,7 @@ final class OpenTelemetryTimer extends AbstractTimer implements RemovableMeter {
@Override
public void onRemove() {
removed = true;
maxHandle.remove();
observableMax.close();
}
private interface Measurements {

View File

@ -80,7 +80,7 @@ public abstract class AbstractGaugeTest {
.register(Metrics.globalRegistry);
Gauge.builder("testGaugeWithTags", () -> 42)
.description("ignored")
.baseUnit("ignored")
.baseUnit("items")
.tags("tag", "2")
.register(Metrics.globalRegistry);