Add max measurements to Micrometer Timer & DistributionSummary (#5303)

This commit is contained in:
Mateusz Rzeszutek 2022-02-08 08:17:28 +01:00 committed by GitHub
parent b9fac11c90
commit 99f8c8d1cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 159 additions and 64 deletions

View File

@ -7,18 +7,22 @@ package io.opentelemetry.instrumentation.micrometer.v1_5;
import static io.opentelemetry.instrumentation.micrometer.v1_5.Bridging.baseUnit; import static io.opentelemetry.instrumentation.micrometer.v1_5.Bridging.baseUnit;
import static io.opentelemetry.instrumentation.micrometer.v1_5.Bridging.description; import static io.opentelemetry.instrumentation.micrometer.v1_5.Bridging.description;
import static io.opentelemetry.instrumentation.micrometer.v1_5.Bridging.statisticInstrumentName;
import static io.opentelemetry.instrumentation.micrometer.v1_5.Bridging.tagsAsAttributes; import static io.opentelemetry.instrumentation.micrometer.v1_5.Bridging.tagsAsAttributes;
import io.micrometer.core.instrument.AbstractDistributionSummary; import io.micrometer.core.instrument.AbstractDistributionSummary;
import io.micrometer.core.instrument.Clock; import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Measurement; import io.micrometer.core.instrument.Measurement;
import io.micrometer.core.instrument.Statistic;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig; import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.distribution.NoopHistogram; import io.micrometer.core.instrument.distribution.NoopHistogram;
import io.micrometer.core.instrument.distribution.TimeWindowMax; import io.micrometer.core.instrument.distribution.TimeWindowMax;
import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram; import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.instrumentation.api.internal.AsyncInstrumentRegistry;
import io.opentelemetry.instrumentation.api.internal.AsyncInstrumentRegistry.AsyncMeasurementHandle;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.atomic.DoubleAdder; import java.util.concurrent.atomic.DoubleAdder;
import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.atomic.LongAdder;
@ -26,10 +30,12 @@ import java.util.concurrent.atomic.LongAdder;
final class OpenTelemetryDistributionSummary extends AbstractDistributionSummary final class OpenTelemetryDistributionSummary extends AbstractDistributionSummary
implements DistributionSummary, RemovableMeter { implements DistributionSummary, RemovableMeter {
private final Measurements measurements;
private final TimeWindowMax max;
// TODO: use bound instruments when they're available // TODO: use bound instruments when they're available
private final DoubleHistogram otelHistogram; private final DoubleHistogram otelHistogram;
private final Attributes attributes; private final Attributes attributes;
private final Measurements measurements; private final AsyncMeasurementHandle maxHandle;
private volatile boolean removed = false; private volatile boolean removed = false;
@ -38,22 +44,32 @@ final class OpenTelemetryDistributionSummary extends AbstractDistributionSummary
Clock clock, Clock clock,
DistributionStatisticConfig distributionStatisticConfig, DistributionStatisticConfig distributionStatisticConfig,
double scale, double scale,
Meter otelMeter) { Meter otelMeter,
AsyncInstrumentRegistry asyncInstrumentRegistry) {
super(id, clock, distributionStatisticConfig, scale, false); super(id, clock, distributionStatisticConfig, scale, false);
if (isUsingMicrometerHistograms()) {
measurements = new MicrometerHistogramMeasurements();
} else {
measurements = NoopMeasurements.INSTANCE;
}
max = new TimeWindowMax(clock, distributionStatisticConfig);
this.attributes = tagsAsAttributes(id);
this.otelHistogram = this.otelHistogram =
otelMeter otelMeter
.histogramBuilder(id.getName()) .histogramBuilder(id.getName())
.setDescription(description(id)) .setDescription(description(id))
.setUnit(baseUnit(id)) .setUnit(baseUnit(id))
.build(); .build();
this.attributes = tagsAsAttributes(id); this.maxHandle =
asyncInstrumentRegistry.buildGauge(
if (isUsingMicrometerHistograms()) { statisticInstrumentName(id, Statistic.MAX),
measurements = new MicrometerHistogramMeasurements(clock, distributionStatisticConfig); description(id),
} else { baseUnit(id),
measurements = NoopMeasurements.INSTANCE; attributes,
} max,
TimeWindowMax::poll);
} }
boolean isUsingMicrometerHistograms() { boolean isUsingMicrometerHistograms() {
@ -65,6 +81,7 @@ final class OpenTelemetryDistributionSummary extends AbstractDistributionSummary
if (amount >= 0 && !removed) { if (amount >= 0 && !removed) {
otelHistogram.record(amount, attributes); otelHistogram.record(amount, attributes);
measurements.record(amount); measurements.record(amount);
max.record(amount);
} }
} }
@ -80,7 +97,7 @@ final class OpenTelemetryDistributionSummary extends AbstractDistributionSummary
@Override @Override
public double max() { public double max() {
return measurements.max(); return max.poll();
} }
@Override @Override
@ -92,6 +109,7 @@ final class OpenTelemetryDistributionSummary extends AbstractDistributionSummary
@Override @Override
public void onRemove() { public void onRemove() {
removed = true; removed = true;
maxHandle.remove();
} }
private interface Measurements { private interface Measurements {
@ -100,8 +118,6 @@ final class OpenTelemetryDistributionSummary extends AbstractDistributionSummary
long count(); long count();
double totalAmount(); double totalAmount();
double max();
} }
// if micrometer histograms are not being used then there's no need to keep any local state // if micrometer histograms are not being used then there's no need to keep any local state
@ -123,32 +139,19 @@ final class OpenTelemetryDistributionSummary extends AbstractDistributionSummary
UnsupportedReadLogger.logWarning(); UnsupportedReadLogger.logWarning();
return Double.NaN; return Double.NaN;
} }
@Override
public double max() {
UnsupportedReadLogger.logWarning();
return Double.NaN;
}
} }
// calculate count, totalAmount and max value for the use of micrometer histograms // calculate count and totalAmount value for the use of micrometer histograms
// kinda similar to how DropwizardDistributionSummary does that // kinda similar to how DropwizardDistributionSummary does that
private static final class MicrometerHistogramMeasurements implements Measurements { private static final class MicrometerHistogramMeasurements implements Measurements {
private final LongAdder count = new LongAdder(); private final LongAdder count = new LongAdder();
private final DoubleAdder totalAmount = new DoubleAdder(); private final DoubleAdder totalAmount = new DoubleAdder();
private final TimeWindowMax max;
MicrometerHistogramMeasurements(
Clock clock, DistributionStatisticConfig distributionStatisticConfig) {
this.max = new TimeWindowMax(clock, distributionStatisticConfig);
}
@Override @Override
public void record(double amount) { public void record(double amount) {
count.increment(); count.increment();
totalAmount.add(amount); totalAmount.add(amount);
max.record(amount);
} }
@Override @Override
@ -160,10 +163,5 @@ final class OpenTelemetryDistributionSummary extends AbstractDistributionSummary
public double totalAmount() { public double totalAmount() {
return totalAmount.sum(); return totalAmount.sum();
} }
@Override
public double max() {
return max.poll();
}
} }
} }

View File

@ -86,7 +86,13 @@ public final class OpenTelemetryMeterRegistry extends MeterRegistry {
DistributionStatisticConfig distributionStatisticConfig, DistributionStatisticConfig distributionStatisticConfig,
PauseDetector pauseDetector) { PauseDetector pauseDetector) {
OpenTelemetryTimer timer = OpenTelemetryTimer timer =
new OpenTelemetryTimer(id, clock, distributionStatisticConfig, pauseDetector, otelMeter); new OpenTelemetryTimer(
id,
clock,
distributionStatisticConfig,
pauseDetector,
otelMeter,
asyncInstrumentRegistry);
if (timer.isUsingMicrometerHistograms()) { if (timer.isUsingMicrometerHistograms()) {
HistogramGauges.registerWithCommonFormat(timer, this); HistogramGauges.registerWithCommonFormat(timer, this);
} }
@ -98,7 +104,7 @@ public final class OpenTelemetryMeterRegistry extends MeterRegistry {
Meter.Id id, DistributionStatisticConfig distributionStatisticConfig, double scale) { Meter.Id id, DistributionStatisticConfig distributionStatisticConfig, double scale) {
OpenTelemetryDistributionSummary distributionSummary = OpenTelemetryDistributionSummary distributionSummary =
new OpenTelemetryDistributionSummary( new OpenTelemetryDistributionSummary(
id, clock, distributionStatisticConfig, scale, otelMeter); id, clock, distributionStatisticConfig, scale, otelMeter, asyncInstrumentRegistry);
if (distributionSummary.isUsingMicrometerHistograms()) { if (distributionSummary.isUsingMicrometerHistograms()) {
HistogramGauges.registerWithCommonFormat(distributionSummary, this); HistogramGauges.registerWithCommonFormat(distributionSummary, this);
} }

View File

@ -6,11 +6,13 @@
package io.opentelemetry.instrumentation.micrometer.v1_5; package io.opentelemetry.instrumentation.micrometer.v1_5;
import static io.opentelemetry.instrumentation.micrometer.v1_5.Bridging.description; import static io.opentelemetry.instrumentation.micrometer.v1_5.Bridging.description;
import static io.opentelemetry.instrumentation.micrometer.v1_5.Bridging.statisticInstrumentName;
import static io.opentelemetry.instrumentation.micrometer.v1_5.Bridging.tagsAsAttributes; import static io.opentelemetry.instrumentation.micrometer.v1_5.Bridging.tagsAsAttributes;
import io.micrometer.core.instrument.AbstractTimer; import io.micrometer.core.instrument.AbstractTimer;
import io.micrometer.core.instrument.Clock; import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Measurement; import io.micrometer.core.instrument.Measurement;
import io.micrometer.core.instrument.Statistic;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig; import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.distribution.NoopHistogram; import io.micrometer.core.instrument.distribution.NoopHistogram;
import io.micrometer.core.instrument.distribution.TimeWindowMax; import io.micrometer.core.instrument.distribution.TimeWindowMax;
@ -19,6 +21,8 @@ import io.micrometer.core.instrument.util.TimeUtils;
import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram; import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.instrumentation.api.internal.AsyncInstrumentRegistry;
import io.opentelemetry.instrumentation.api.internal.AsyncInstrumentRegistry.AsyncMeasurementHandle;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.atomic.LongAdder;
@ -27,10 +31,12 @@ final class OpenTelemetryTimer extends AbstractTimer implements RemovableMeter {
private static final double NANOS_PER_MS = TimeUnit.MILLISECONDS.toNanos(1); private static final double NANOS_PER_MS = TimeUnit.MILLISECONDS.toNanos(1);
private final Measurements measurements;
private final TimeWindowMax max;
// TODO: use bound instruments when they're available // TODO: use bound instruments when they're available
private final DoubleHistogram otelHistogram; private final DoubleHistogram otelHistogram;
private final Attributes attributes; private final Attributes attributes;
private final Measurements measurements; private final AsyncMeasurementHandle maxHandle;
private volatile boolean removed = false; private volatile boolean removed = false;
@ -39,22 +45,32 @@ final class OpenTelemetryTimer extends AbstractTimer implements RemovableMeter {
Clock clock, Clock clock,
DistributionStatisticConfig distributionStatisticConfig, DistributionStatisticConfig distributionStatisticConfig,
PauseDetector pauseDetector, PauseDetector pauseDetector,
Meter otelMeter) { Meter otelMeter,
AsyncInstrumentRegistry asyncInstrumentRegistry) {
super(id, clock, distributionStatisticConfig, pauseDetector, TimeUnit.MILLISECONDS, false); super(id, clock, distributionStatisticConfig, pauseDetector, TimeUnit.MILLISECONDS, false);
if (isUsingMicrometerHistograms()) {
measurements = new MicrometerHistogramMeasurements();
} else {
measurements = NoopMeasurements.INSTANCE;
}
max = new TimeWindowMax(clock, distributionStatisticConfig);
this.attributes = tagsAsAttributes(id);
this.otelHistogram = this.otelHistogram =
otelMeter otelMeter
.histogramBuilder(id.getName()) .histogramBuilder(id.getName())
.setDescription(description(id)) .setDescription(description(id))
.setUnit("ms") .setUnit("ms")
.build(); .build();
this.attributes = tagsAsAttributes(id); this.maxHandle =
asyncInstrumentRegistry.buildGauge(
if (isUsingMicrometerHistograms()) { statisticInstrumentName(id, Statistic.MAX),
measurements = new MicrometerHistogramMeasurements(clock, distributionStatisticConfig); description(id),
} else { "ms",
measurements = NoopMeasurements.INSTANCE; attributes,
} max,
m -> m.poll(TimeUnit.MILLISECONDS));
} }
boolean isUsingMicrometerHistograms() { boolean isUsingMicrometerHistograms() {
@ -68,6 +84,7 @@ final class OpenTelemetryTimer extends AbstractTimer implements RemovableMeter {
double time = nanos / NANOS_PER_MS; double time = nanos / NANOS_PER_MS;
otelHistogram.record(time, attributes); otelHistogram.record(time, attributes);
measurements.record(nanos); measurements.record(nanos);
max.record(nanos, TimeUnit.NANOSECONDS);
} }
} }
@ -83,7 +100,7 @@ final class OpenTelemetryTimer extends AbstractTimer implements RemovableMeter {
@Override @Override
public double max(TimeUnit unit) { public double max(TimeUnit unit) {
return measurements.max(unit); return max.poll(unit);
} }
@Override @Override
@ -95,6 +112,7 @@ final class OpenTelemetryTimer extends AbstractTimer implements RemovableMeter {
@Override @Override
public void onRemove() { public void onRemove() {
removed = true; removed = true;
maxHandle.remove();
} }
private interface Measurements { private interface Measurements {
@ -103,8 +121,6 @@ final class OpenTelemetryTimer extends AbstractTimer implements RemovableMeter {
long count(); long count();
double totalTime(TimeUnit unit); double totalTime(TimeUnit unit);
double max(TimeUnit unit);
} }
// if micrometer histograms are not being used then there's no need to keep any local state // if micrometer histograms are not being used then there's no need to keep any local state
@ -126,32 +142,19 @@ final class OpenTelemetryTimer extends AbstractTimer implements RemovableMeter {
UnsupportedReadLogger.logWarning(); UnsupportedReadLogger.logWarning();
return Double.NaN; return Double.NaN;
} }
@Override
public double max(TimeUnit unit) {
UnsupportedReadLogger.logWarning();
return Double.NaN;
}
} }
// calculate count, totalTime and max value for the use of micrometer histograms // calculate count and totalTime value for the use of micrometer histograms
// kinda similar to how DropwizardTimer does that // kinda similar to how DropwizardTimer does that
private static final class MicrometerHistogramMeasurements implements Measurements { private static final class MicrometerHistogramMeasurements implements Measurements {
private final LongAdder count = new LongAdder(); private final LongAdder count = new LongAdder();
private final LongAdder totalTime = new LongAdder(); private final LongAdder totalTime = new LongAdder();
private final TimeWindowMax max;
MicrometerHistogramMeasurements(
Clock clock, DistributionStatisticConfig distributionStatisticConfig) {
this.max = new TimeWindowMax(clock, distributionStatisticConfig);
}
@Override @Override
public void record(long nanos) { public void record(long nanos) {
count.increment(); count.increment();
totalTime.add(nanos); totalTime.add(nanos);
max.record(nanos, TimeUnit.NANOSECONDS);
} }
@Override @Override
@ -163,10 +166,5 @@ final class OpenTelemetryTimer extends AbstractTimer implements RemovableMeter {
public double totalTime(TimeUnit unit) { public double totalTime(TimeUnit unit) {
return TimeUtils.nanosToUnit(totalTime.sum(), unit); return TimeUtils.nanosToUnit(totalTime.sum(), unit);
} }
@Override
public double max(TimeUnit unit) {
return max.poll(unit);
}
} }
} }

View File

@ -11,6 +11,7 @@ import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.attri
import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Metrics;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.assertj.core.api.AbstractIterableAssert;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -169,4 +170,50 @@ public abstract class AbstractDistributionSummaryTest {
point -> point ->
assertThat(point).attributes().containsEntry("phi", "0.99")))); assertThat(point).attributes().containsEntry("phi", "0.99"))));
} }
@Test
void testMicrometerMax() throws InterruptedException {
// given
DistributionSummary summary =
DistributionSummary.builder("testSummaryMax")
.description("This is a test distribution summary")
.baseUnit("things")
.tags("tag", "value")
.register(Metrics.globalRegistry);
// when
summary.record(1);
summary.record(2);
summary.record(4);
// then
testing()
.waitAndAssertMetrics(
INSTRUMENTATION_NAME,
"testSummaryMax.max",
metrics ->
metrics.anySatisfy(
metric ->
assertThat(metric)
.hasDescription("This is a test distribution summary")
.hasDoubleGauge()
.points()
.anySatisfy(
point ->
assertThat(point)
.hasValue(4)
.attributes()
.containsEntry("tag", "value"))));
// when
Metrics.globalRegistry.remove(summary);
Thread.sleep(10); // give time for any inflight metric export to be received
testing().clearData();
// then
Thread.sleep(100); // interval of the test metrics exporter
testing()
.waitAndAssertMetrics(
INSTRUMENTATION_NAME, "testSummaryMax.max", AbstractIterableAssert::isEmpty);
}
} }

View File

@ -13,6 +13,7 @@ import io.micrometer.core.instrument.Timer;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import java.time.Duration; import java.time.Duration;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.assertj.core.api.AbstractIterableAssert;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -197,4 +198,49 @@ public abstract class AbstractTimerTest {
point -> point ->
assertThat(point).attributes().containsEntry("phi", "0.99")))); assertThat(point).attributes().containsEntry("phi", "0.99"))));
} }
@Test
void testMicrometerMax() throws InterruptedException {
// given
Timer timer =
Timer.builder("testTimerMax")
.description("This is a test timer")
.tags("tag", "value")
.register(Metrics.globalRegistry);
// when
timer.record(1, TimeUnit.SECONDS);
timer.record(2, TimeUnit.SECONDS);
timer.record(4, TimeUnit.SECONDS);
// then
testing()
.waitAndAssertMetrics(
INSTRUMENTATION_NAME,
"testTimerMax.max",
metrics ->
metrics.anySatisfy(
metric ->
assertThat(metric)
.hasDescription("This is a test timer")
.hasDoubleGauge()
.points()
.anySatisfy(
point ->
assertThat(point)
.hasValue(4_000)
.attributes()
.containsEntry("tag", "value"))));
// when
Metrics.globalRegistry.remove(timer);
Thread.sleep(10); // give time for any inflight metric export to be received
testing().clearData();
// then
Thread.sleep(100); // interval of the test metrics exporter
testing()
.waitAndAssertMetrics(
INSTRUMENTATION_NAME, "testTimerMax.max", AbstractIterableAssert::isEmpty);
}
} }