mirror of https://github.com/grpc/grpc-java.git
Add gauge metric API and Otel implementation
This is needed by gRFC A78 for xds metrics, and for RLS metrics. Since gauges need to acquire a lock (or other synchronization) in the callback, the callback allows batching multiple gauges together to avoid acquiring-and-requiring such locks. Unlike other metrics, gauges are reported on-demand to the MetricSink. This means not all sinks will receive the same data, as the sinks will ask for the gauges at different times.
This commit is contained in:
parent
8516cfef9c
commit
354b028cae
|
|
@ -0,0 +1,23 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2024 The gRPC Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.grpc;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tagging interface for MetricInstruments that can be used with batch callbacks.
|
||||||
|
*/
|
||||||
|
@Internal
|
||||||
|
public interface CallbackMetricInstrument extends MetricInstrument {}
|
||||||
|
|
@ -22,7 +22,8 @@ import java.util.List;
|
||||||
* Represents a long-valued gauge metric instrument.
|
* Represents a long-valued gauge metric instrument.
|
||||||
*/
|
*/
|
||||||
@Internal
|
@Internal
|
||||||
public final class LongGaugeMetricInstrument extends PartialMetricInstrument {
|
public final class LongGaugeMetricInstrument extends PartialMetricInstrument
|
||||||
|
implements CallbackMetricInstrument {
|
||||||
public LongGaugeMetricInstrument(int index, String name, String description, String unit,
|
public LongGaugeMetricInstrument(int index, String name, String description, String unit,
|
||||||
List<String> requiredLabelKeys, List<String> optionalLabelKeys, boolean enableByDefault) {
|
List<String> requiredLabelKeys, List<String> optionalLabelKeys, boolean enableByDefault) {
|
||||||
super(index, name, description, unit, requiredLabelKeys, optionalLabelKeys, enableByDefault);
|
super(index, name, description, unit, requiredLabelKeys, optionalLabelKeys, enableByDefault);
|
||||||
|
|
|
||||||
|
|
@ -67,4 +67,43 @@ public interface MetricRecorder {
|
||||||
*/
|
*/
|
||||||
default void recordLongHistogram(LongHistogramMetricInstrument metricInstrument, long value,
|
default void recordLongHistogram(LongHistogramMetricInstrument metricInstrument, long value,
|
||||||
List<String> requiredLabelValues, List<String> optionalLabelValues) {}
|
List<String> requiredLabelValues, List<String> optionalLabelValues) {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Registers a callback to produce metric values for only the listed instruments. The returned
|
||||||
|
* registration must be closed when no longer needed, which will remove the callback.
|
||||||
|
*
|
||||||
|
* @param callback The callback to call to record.
|
||||||
|
* @param metricInstruments The metric instruments the callback will record against.
|
||||||
|
*/
|
||||||
|
default Registration registerBatchCallback(BatchCallback callback,
|
||||||
|
CallbackMetricInstrument... metricInstruments) {
|
||||||
|
return () -> { };
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Callback to record gauge values. */
|
||||||
|
interface BatchCallback {
|
||||||
|
/** Records instrument values into {@code recorder}. */
|
||||||
|
void accept(BatchRecorder recorder);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Recorder for instrument values produced by a batch callback. */
|
||||||
|
interface BatchRecorder {
|
||||||
|
/**
|
||||||
|
* Record a long gauge value.
|
||||||
|
*
|
||||||
|
* @param value The value to record.
|
||||||
|
* @param requiredLabelValues A list of required label values for the metric.
|
||||||
|
* @param optionalLabelValues A list of additional, optional label values for the metric.
|
||||||
|
*/
|
||||||
|
void recordLongGauge(LongGaugeMetricInstrument metricInstrument, long value,
|
||||||
|
List<String> requiredLabelValues, List<String> optionalLabelValues);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** A handle to a registration, that allows unregistration. */
|
||||||
|
interface Registration extends AutoCloseable {
|
||||||
|
// Redefined to not throw an exception.
|
||||||
|
/** Unregister. */
|
||||||
|
@Override
|
||||||
|
void close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -99,5 +99,30 @@ public interface MetricSink {
|
||||||
List<String> requiredLabelValues, List<String> optionalLabelValues) {
|
List<String> requiredLabelValues, List<String> optionalLabelValues) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Record a long gauge value.
|
||||||
|
*
|
||||||
|
* @param value The value to record.
|
||||||
|
* @param requiredLabelValues A list of required label values for the metric.
|
||||||
|
* @param optionalLabelValues A list of additional, optional label values for the metric.
|
||||||
|
*/
|
||||||
|
default void recordLongGauge(LongGaugeMetricInstrument metricInstrument, long value,
|
||||||
|
List<String> requiredLabelValues, List<String> optionalLabelValues){
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Registers a callback to produce metric values for only the listed instruments. The returned
|
||||||
|
* registration must be closed when no longer needed, which will remove the callback.
|
||||||
|
*
|
||||||
|
* @param callback The callback to call to record.
|
||||||
|
* @param metricInstruments The metric instruments the callback will record against.
|
||||||
|
*/
|
||||||
|
default Registration registerBatchCallback(Runnable callback,
|
||||||
|
CallbackMetricInstrument... metricInstruments) {
|
||||||
|
return () -> { };
|
||||||
|
}
|
||||||
|
|
||||||
|
interface Registration extends MetricRecorder.Registration {}
|
||||||
|
|
||||||
void updateMeasures(List<MetricInstrument> instruments);
|
void updateMeasures(List<MetricInstrument> instruments);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -17,16 +17,21 @@
|
||||||
package io.grpc.internal;
|
package io.grpc.internal;
|
||||||
|
|
||||||
import static com.google.common.base.Preconditions.checkArgument;
|
import static com.google.common.base.Preconditions.checkArgument;
|
||||||
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import io.grpc.CallbackMetricInstrument;
|
||||||
import io.grpc.DoubleCounterMetricInstrument;
|
import io.grpc.DoubleCounterMetricInstrument;
|
||||||
import io.grpc.DoubleHistogramMetricInstrument;
|
import io.grpc.DoubleHistogramMetricInstrument;
|
||||||
import io.grpc.LongCounterMetricInstrument;
|
import io.grpc.LongCounterMetricInstrument;
|
||||||
|
import io.grpc.LongGaugeMetricInstrument;
|
||||||
import io.grpc.LongHistogramMetricInstrument;
|
import io.grpc.LongHistogramMetricInstrument;
|
||||||
import io.grpc.MetricInstrument;
|
import io.grpc.MetricInstrument;
|
||||||
import io.grpc.MetricInstrumentRegistry;
|
import io.grpc.MetricInstrumentRegistry;
|
||||||
import io.grpc.MetricRecorder;
|
import io.grpc.MetricRecorder;
|
||||||
import io.grpc.MetricSink;
|
import io.grpc.MetricSink;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.BitSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -171,4 +176,62 @@ final class MetricRecorderImpl implements MetricRecorder {
|
||||||
sink.recordLongHistogram(metricInstrument, value, requiredLabelValues, optionalLabelValues);
|
sink.recordLongHistogram(metricInstrument, value, requiredLabelValues, optionalLabelValues);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Registration registerBatchCallback(BatchCallback callback,
|
||||||
|
CallbackMetricInstrument... metricInstruments) {
|
||||||
|
long largestMetricInstrumentIndex = -1;
|
||||||
|
BitSet allowedInstruments = new BitSet();
|
||||||
|
for (CallbackMetricInstrument metricInstrument : metricInstruments) {
|
||||||
|
largestMetricInstrumentIndex =
|
||||||
|
Math.max(largestMetricInstrumentIndex, metricInstrument.getIndex());
|
||||||
|
allowedInstruments.set(metricInstrument.getIndex());
|
||||||
|
}
|
||||||
|
List<MetricSink.Registration> registrations = new ArrayList<>();
|
||||||
|
for (MetricSink sink : metricSinks) {
|
||||||
|
int measuresSize = sink.getMeasuresSize();
|
||||||
|
if (measuresSize <= largestMetricInstrumentIndex) {
|
||||||
|
// Measures may need updating in two cases:
|
||||||
|
// 1. When the sink is initially created with an empty list of measures.
|
||||||
|
// 2. When new metric instruments are registered, requiring the sink to accommodate them.
|
||||||
|
sink.updateMeasures(registry.getMetricInstruments());
|
||||||
|
}
|
||||||
|
BatchRecorder singleSinkRecorder = new BatchRecorderImpl(sink, allowedInstruments);
|
||||||
|
registrations.add(sink.registerBatchCallback(
|
||||||
|
() -> callback.accept(singleSinkRecorder), metricInstruments));
|
||||||
|
}
|
||||||
|
return () -> {
|
||||||
|
for (MetricSink.Registration registration : registrations) {
|
||||||
|
registration.close();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Recorder for instrument values produced by a batch callback. */
|
||||||
|
static class BatchRecorderImpl implements BatchRecorder {
|
||||||
|
private final MetricSink sink;
|
||||||
|
private final BitSet allowedInstruments;
|
||||||
|
|
||||||
|
BatchRecorderImpl(MetricSink sink, BitSet allowedInstruments) {
|
||||||
|
this.sink = checkNotNull(sink, "sink");
|
||||||
|
this.allowedInstruments = checkNotNull(allowedInstruments, "allowedInstruments");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void recordLongGauge(LongGaugeMetricInstrument metricInstrument, long value,
|
||||||
|
List<String> requiredLabelValues, List<String> optionalLabelValues) {
|
||||||
|
checkArgument(allowedInstruments.get(metricInstrument.getIndex()),
|
||||||
|
"Instrument was not listed when registering callback: %s", metricInstrument);
|
||||||
|
checkArgument(requiredLabelValues != null
|
||||||
|
&& requiredLabelValues.size() == metricInstrument.getRequiredLabelKeys().size(),
|
||||||
|
"Incorrect number of required labels provided. Expected: %s",
|
||||||
|
metricInstrument.getRequiredLabelKeys().size());
|
||||||
|
checkArgument(optionalLabelValues != null
|
||||||
|
&& optionalLabelValues.size() == metricInstrument.getOptionalLabelKeys().size(),
|
||||||
|
"Incorrect number of optional labels provided. Expected: %s",
|
||||||
|
metricInstrument.getOptionalLabelKeys().size());
|
||||||
|
// Registering the callback checked that the instruments were be present in sink.
|
||||||
|
sink.recordLongGauge(metricInstrument, value, requiredLabelValues, optionalLabelValues);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,8 @@
|
||||||
|
|
||||||
package io.grpc.internal;
|
package io.grpc.internal;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertThrows;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.anyList;
|
import static org.mockito.ArgumentMatchers.anyList;
|
||||||
import static org.mockito.ArgumentMatchers.eq;
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
@ -28,6 +30,7 @@ import com.google.common.collect.ImmutableList;
|
||||||
import io.grpc.DoubleCounterMetricInstrument;
|
import io.grpc.DoubleCounterMetricInstrument;
|
||||||
import io.grpc.DoubleHistogramMetricInstrument;
|
import io.grpc.DoubleHistogramMetricInstrument;
|
||||||
import io.grpc.LongCounterMetricInstrument;
|
import io.grpc.LongCounterMetricInstrument;
|
||||||
|
import io.grpc.LongGaugeMetricInstrument;
|
||||||
import io.grpc.LongHistogramMetricInstrument;
|
import io.grpc.LongHistogramMetricInstrument;
|
||||||
import io.grpc.MetricInstrumentRegistry;
|
import io.grpc.MetricInstrumentRegistry;
|
||||||
import io.grpc.MetricInstrumentRegistryAccessor;
|
import io.grpc.MetricInstrumentRegistryAccessor;
|
||||||
|
|
@ -40,6 +43,7 @@ import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.JUnit4;
|
import org.junit.runners.JUnit4;
|
||||||
|
import org.mockito.ArgumentCaptor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unit test for {@link MetricRecorderImpl}.
|
* Unit test for {@link MetricRecorderImpl}.
|
||||||
|
|
@ -72,6 +76,9 @@ public class MetricRecorderImplTest {
|
||||||
private final LongHistogramMetricInstrument longHistogramInstrument =
|
private final LongHistogramMetricInstrument longHistogramInstrument =
|
||||||
registry.registerLongHistogram("histogram2", DESCRIPTION, UNIT,
|
registry.registerLongHistogram("histogram2", DESCRIPTION, UNIT,
|
||||||
Collections.emptyList(), REQUIRED_LABEL_KEYS, OPTIONAL_LABEL_KEYS, ENABLED);
|
Collections.emptyList(), REQUIRED_LABEL_KEYS, OPTIONAL_LABEL_KEYS, ENABLED);
|
||||||
|
private final LongGaugeMetricInstrument longGaugeInstrument =
|
||||||
|
registry.registerLongGauge("gauge0", DESCRIPTION, UNIT, REQUIRED_LABEL_KEYS,
|
||||||
|
OPTIONAL_LABEL_KEYS, ENABLED);
|
||||||
private MetricRecorder recorder;
|
private MetricRecorder recorder;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
|
@ -113,6 +120,34 @@ public class MetricRecorderImplTest {
|
||||||
verify(mockSink, never()).updateMeasures(registry.getMetricInstruments());
|
verify(mockSink, never()).updateMeasures(registry.getMetricInstruments());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void recordCallback() {
|
||||||
|
MetricSink.Registration mockRegistration = mock(MetricSink.Registration.class);
|
||||||
|
when(mockSink.getMeasuresSize()).thenReturn(5);
|
||||||
|
when(mockSink.registerBatchCallback(any(Runnable.class), eq(longGaugeInstrument)))
|
||||||
|
.thenReturn(mockRegistration);
|
||||||
|
|
||||||
|
MetricRecorder.Registration registration = recorder.registerBatchCallback((recorder) -> {
|
||||||
|
recorder.recordLongGauge(
|
||||||
|
longGaugeInstrument, 99, REQUIRED_LABEL_VALUES, OPTIONAL_LABEL_VALUES);
|
||||||
|
}, longGaugeInstrument);
|
||||||
|
|
||||||
|
ArgumentCaptor<Runnable> callbackCaptor = ArgumentCaptor.forClass(Runnable.class);
|
||||||
|
verify(mockSink, times(2))
|
||||||
|
.registerBatchCallback(callbackCaptor.capture(), eq(longGaugeInstrument));
|
||||||
|
|
||||||
|
callbackCaptor.getValue().run();
|
||||||
|
// Only once, for the one sink that called the callback.
|
||||||
|
verify(mockSink).recordLongGauge(
|
||||||
|
longGaugeInstrument, 99, REQUIRED_LABEL_VALUES, OPTIONAL_LABEL_VALUES);
|
||||||
|
|
||||||
|
verify(mockRegistration, never()).close();
|
||||||
|
registration.close();
|
||||||
|
verify(mockRegistration, times(2)).close();
|
||||||
|
|
||||||
|
verify(mockSink, never()).updateMeasures(registry.getMetricInstruments());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void newRegisteredMetricUpdateMeasures() {
|
public void newRegisteredMetricUpdateMeasures() {
|
||||||
// Sink is initialized with zero measures, should trigger updateMeasures() on sinks
|
// Sink is initialized with zero measures, should trigger updateMeasures() on sinks
|
||||||
|
|
@ -145,6 +180,16 @@ public class MetricRecorderImplTest {
|
||||||
verify(mockSink, times(8)).updateMeasures(registry.getMetricInstruments());
|
verify(mockSink, times(8)).updateMeasures(registry.getMetricInstruments());
|
||||||
verify(mockSink, times(2)).recordLongHistogram(eq(longHistogramInstrument), eq(99L),
|
verify(mockSink, times(2)).recordLongHistogram(eq(longHistogramInstrument), eq(99L),
|
||||||
eq(REQUIRED_LABEL_VALUES), eq(OPTIONAL_LABEL_VALUES));
|
eq(REQUIRED_LABEL_VALUES), eq(OPTIONAL_LABEL_VALUES));
|
||||||
|
|
||||||
|
// Callback
|
||||||
|
when(mockSink.registerBatchCallback(any(Runnable.class), eq(longGaugeInstrument)))
|
||||||
|
.thenReturn(mock(MetricSink.Registration.class));
|
||||||
|
MetricRecorder.Registration registration = recorder.registerBatchCallback(
|
||||||
|
(recorder) -> { }, longGaugeInstrument);
|
||||||
|
verify(mockSink, times(10)).updateMeasures(registry.getMetricInstruments());
|
||||||
|
verify(mockSink, times(2))
|
||||||
|
.registerBatchCallback(any(Runnable.class), eq(longGaugeInstrument));
|
||||||
|
registration.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = IllegalArgumentException.class)
|
@Test(expected = IllegalArgumentException.class)
|
||||||
|
|
@ -179,6 +224,26 @@ public class MetricRecorderImplTest {
|
||||||
OPTIONAL_LABEL_VALUES);
|
OPTIONAL_LABEL_VALUES);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void recordLongGaugeMismatchedRequiredLabelValues() {
|
||||||
|
when(mockSink.getMeasuresSize()).thenReturn(4);
|
||||||
|
when(mockSink.registerBatchCallback(any(Runnable.class), eq(longGaugeInstrument)))
|
||||||
|
.thenReturn(mock(MetricSink.Registration.class));
|
||||||
|
|
||||||
|
MetricRecorder.Registration registration = recorder.registerBatchCallback((recorder) -> {
|
||||||
|
assertThrows(
|
||||||
|
IllegalArgumentException.class,
|
||||||
|
() -> recorder.recordLongGauge(
|
||||||
|
longGaugeInstrument, 99, ImmutableList.of(), OPTIONAL_LABEL_VALUES));
|
||||||
|
}, longGaugeInstrument);
|
||||||
|
|
||||||
|
ArgumentCaptor<Runnable> callbackCaptor = ArgumentCaptor.forClass(Runnable.class);
|
||||||
|
verify(mockSink, times(2))
|
||||||
|
.registerBatchCallback(callbackCaptor.capture(), eq(longGaugeInstrument));
|
||||||
|
callbackCaptor.getValue().run();
|
||||||
|
registration.close();
|
||||||
|
}
|
||||||
|
|
||||||
@Test(expected = IllegalArgumentException.class)
|
@Test(expected = IllegalArgumentException.class)
|
||||||
public void addDoubleCounterMismatchedOptionalLabelValues() {
|
public void addDoubleCounterMismatchedOptionalLabelValues() {
|
||||||
when(mockSink.getMeasuresSize()).thenReturn(4);
|
when(mockSink.getMeasuresSize()).thenReturn(4);
|
||||||
|
|
@ -210,4 +275,24 @@ public class MetricRecorderImplTest {
|
||||||
recorder.recordLongHistogram(longHistogramInstrument, 99, REQUIRED_LABEL_VALUES,
|
recorder.recordLongHistogram(longHistogramInstrument, 99, REQUIRED_LABEL_VALUES,
|
||||||
ImmutableList.of());
|
ImmutableList.of());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void recordLongGaugeMismatchedOptionalLabelValues() {
|
||||||
|
when(mockSink.getMeasuresSize()).thenReturn(4);
|
||||||
|
when(mockSink.registerBatchCallback(any(Runnable.class), eq(longGaugeInstrument)))
|
||||||
|
.thenReturn(mock(MetricSink.Registration.class));
|
||||||
|
|
||||||
|
MetricRecorder.Registration registration = recorder.registerBatchCallback((recorder) -> {
|
||||||
|
assertThrows(
|
||||||
|
IllegalArgumentException.class,
|
||||||
|
() -> recorder.recordLongGauge(
|
||||||
|
longGaugeInstrument, 99, REQUIRED_LABEL_VALUES, ImmutableList.of()));
|
||||||
|
}, longGaugeInstrument);
|
||||||
|
|
||||||
|
ArgumentCaptor<Runnable> callbackCaptor = ArgumentCaptor.forClass(Runnable.class);
|
||||||
|
verify(mockSink, times(2))
|
||||||
|
.registerBatchCallback(callbackCaptor.capture(), eq(longGaugeInstrument));
|
||||||
|
callbackCaptor.getValue().run();
|
||||||
|
registration.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -21,19 +21,24 @@ import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
import io.grpc.CallbackMetricInstrument;
|
||||||
import io.grpc.DoubleCounterMetricInstrument;
|
import io.grpc.DoubleCounterMetricInstrument;
|
||||||
import io.grpc.DoubleHistogramMetricInstrument;
|
import io.grpc.DoubleHistogramMetricInstrument;
|
||||||
import io.grpc.LongCounterMetricInstrument;
|
import io.grpc.LongCounterMetricInstrument;
|
||||||
|
import io.grpc.LongGaugeMetricInstrument;
|
||||||
import io.grpc.LongHistogramMetricInstrument;
|
import io.grpc.LongHistogramMetricInstrument;
|
||||||
import io.grpc.MetricInstrument;
|
import io.grpc.MetricInstrument;
|
||||||
import io.grpc.MetricSink;
|
import io.grpc.MetricSink;
|
||||||
import io.opentelemetry.api.common.Attributes;
|
import io.opentelemetry.api.common.Attributes;
|
||||||
import io.opentelemetry.api.common.AttributesBuilder;
|
import io.opentelemetry.api.common.AttributesBuilder;
|
||||||
|
import io.opentelemetry.api.metrics.BatchCallback;
|
||||||
import io.opentelemetry.api.metrics.DoubleCounter;
|
import io.opentelemetry.api.metrics.DoubleCounter;
|
||||||
import io.opentelemetry.api.metrics.DoubleHistogram;
|
import io.opentelemetry.api.metrics.DoubleHistogram;
|
||||||
import io.opentelemetry.api.metrics.LongCounter;
|
import io.opentelemetry.api.metrics.LongCounter;
|
||||||
import io.opentelemetry.api.metrics.LongHistogram;
|
import io.opentelemetry.api.metrics.LongHistogram;
|
||||||
import io.opentelemetry.api.metrics.Meter;
|
import io.opentelemetry.api.metrics.Meter;
|
||||||
|
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
|
||||||
|
import io.opentelemetry.api.metrics.ObservableMeasurement;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.BitSet;
|
import java.util.BitSet;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
|
@ -142,6 +147,48 @@ final class OpenTelemetryMetricSink implements MetricSink {
|
||||||
histogram.record(value, attributes);
|
histogram.record(value, attributes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void recordLongGauge(LongGaugeMetricInstrument metricInstrument, long value,
|
||||||
|
List<String> requiredLabelValues, List<String> optionalLabelValues) {
|
||||||
|
MeasuresData instrumentData = measures.get(metricInstrument.getIndex());
|
||||||
|
if (instrumentData == null) {
|
||||||
|
// Disabled metric
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Attributes attributes = createAttributes(metricInstrument.getRequiredLabelKeys(),
|
||||||
|
metricInstrument.getOptionalLabelKeys(), requiredLabelValues, optionalLabelValues,
|
||||||
|
instrumentData.getOptionalLabelsBitSet());
|
||||||
|
ObservableLongMeasurement gauge = (ObservableLongMeasurement) instrumentData.getMeasure();
|
||||||
|
gauge.record(value, attributes);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Registration registerBatchCallback(Runnable callback,
|
||||||
|
CallbackMetricInstrument... metricInstruments) {
|
||||||
|
List<ObservableMeasurement> measurements = new ArrayList<>(metricInstruments.length);
|
||||||
|
for (CallbackMetricInstrument metricInstrument: metricInstruments) {
|
||||||
|
MeasuresData instrumentData = measures.get(metricInstrument.getIndex());
|
||||||
|
if (instrumentData == null) {
|
||||||
|
// Disabled metric
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (!(instrumentData.getMeasure() instanceof ObservableMeasurement)) {
|
||||||
|
logger.log(Level.FINE, "Unsupported metric instrument type : {0} {1}",
|
||||||
|
new Object[] {metricInstrument, instrumentData.getMeasure().getClass()});
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
measurements.add((ObservableMeasurement) instrumentData.getMeasure());
|
||||||
|
}
|
||||||
|
if (measurements.isEmpty()) {
|
||||||
|
return () -> { };
|
||||||
|
}
|
||||||
|
ObservableMeasurement first = measurements.get(0);
|
||||||
|
measurements.remove(0);
|
||||||
|
BatchCallback closeable = openTelemetryMeter.batchCallback(
|
||||||
|
callback, first, measurements.toArray(new ObservableMeasurement[0]));
|
||||||
|
return closeable::close;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateMeasures(List<MetricInstrument> instruments) {
|
public void updateMeasures(List<MetricInstrument> instruments) {
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
|
|
@ -203,6 +250,12 @@ final class OpenTelemetryMetricSink implements MetricSink {
|
||||||
.setDescription(description)
|
.setDescription(description)
|
||||||
.ofLongs()
|
.ofLongs()
|
||||||
.build();
|
.build();
|
||||||
|
} else if (instrument instanceof LongGaugeMetricInstrument) {
|
||||||
|
openTelemetryMeasure = openTelemetryMeter.gaugeBuilder(name)
|
||||||
|
.setUnit(unit)
|
||||||
|
.setDescription(description)
|
||||||
|
.ofLongs()
|
||||||
|
.buildObserver();
|
||||||
} else {
|
} else {
|
||||||
logger.log(Level.FINE, "Unsupported metric instrument type : {0}", instrument);
|
logger.log(Level.FINE, "Unsupported metric instrument type : {0}", instrument);
|
||||||
openTelemetryMeasure = null;
|
openTelemetryMeasure = null;
|
||||||
|
|
@ -241,7 +294,6 @@ final class OpenTelemetryMetricSink implements MetricSink {
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static final class MeasuresData {
|
static final class MeasuresData {
|
||||||
final BitSet optionalLabelsIndices;
|
final BitSet optionalLabelsIndices;
|
||||||
final Object measure;
|
final Object measure;
|
||||||
|
|
|
||||||
|
|
@ -22,8 +22,10 @@ import com.google.common.collect.ImmutableList;
|
||||||
import io.grpc.DoubleCounterMetricInstrument;
|
import io.grpc.DoubleCounterMetricInstrument;
|
||||||
import io.grpc.DoubleHistogramMetricInstrument;
|
import io.grpc.DoubleHistogramMetricInstrument;
|
||||||
import io.grpc.LongCounterMetricInstrument;
|
import io.grpc.LongCounterMetricInstrument;
|
||||||
|
import io.grpc.LongGaugeMetricInstrument;
|
||||||
import io.grpc.LongHistogramMetricInstrument;
|
import io.grpc.LongHistogramMetricInstrument;
|
||||||
import io.grpc.MetricInstrument;
|
import io.grpc.MetricInstrument;
|
||||||
|
import io.grpc.MetricSink;
|
||||||
import io.grpc.opentelemetry.internal.OpenTelemetryConstants;
|
import io.grpc.opentelemetry.internal.OpenTelemetryConstants;
|
||||||
import io.opentelemetry.api.common.AttributeKey;
|
import io.opentelemetry.api.common.AttributeKey;
|
||||||
import io.opentelemetry.api.metrics.DoubleCounter;
|
import io.opentelemetry.api.metrics.DoubleCounter;
|
||||||
|
|
@ -298,6 +300,79 @@ public class OpenTelemetryMetricSinkTest {
|
||||||
assertThat(openTelemetryTesting.getMetrics()).isEmpty();
|
assertThat(openTelemetryTesting.getMetrics()).isEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void registerBatchCallback_allDisabled() {
|
||||||
|
// set up sink with disabled metric
|
||||||
|
Map<String, Boolean> enabledMetrics = new HashMap<>();
|
||||||
|
|
||||||
|
LongGaugeMetricInstrument longGaugeInstrumentDisabled =
|
||||||
|
new LongGaugeMetricInstrument(0, "disk", "Amount of disk used", "By",
|
||||||
|
Collections.emptyList(), Collections.emptyList(), false);
|
||||||
|
|
||||||
|
// Create sink
|
||||||
|
sink = new OpenTelemetryMetricSink(testMeter, enabledMetrics, false, Collections.emptyList());
|
||||||
|
|
||||||
|
// Invoke updateMeasures
|
||||||
|
sink.updateMeasures(Arrays.asList(longGaugeInstrumentDisabled));
|
||||||
|
|
||||||
|
MetricSink.Registration registration = sink.registerBatchCallback(() -> {
|
||||||
|
sink.recordLongGauge(
|
||||||
|
longGaugeInstrumentDisabled, 999, Collections.emptyList(), Collections.emptyList());
|
||||||
|
}, longGaugeInstrumentDisabled);
|
||||||
|
|
||||||
|
assertThat(openTelemetryTesting.getMetrics())
|
||||||
|
.satisfiesExactlyInAnyOrder();
|
||||||
|
registration.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void registerBatchCallback_bothEnabledAndDisabled() {
|
||||||
|
// set up sink with disabled metric
|
||||||
|
Map<String, Boolean> enabledMetrics = new HashMap<>();
|
||||||
|
enabledMetrics.put("memory", true);
|
||||||
|
|
||||||
|
LongGaugeMetricInstrument longGaugeInstrumentEnabled =
|
||||||
|
new LongGaugeMetricInstrument(0, "memory", "Amount of memory used", "By",
|
||||||
|
Collections.emptyList(), Collections.emptyList(), false);
|
||||||
|
LongGaugeMetricInstrument longGaugeInstrumentDisabled =
|
||||||
|
new LongGaugeMetricInstrument(1, "disk", "Amount of disk used", "By",
|
||||||
|
Collections.emptyList(), Collections.emptyList(), false);
|
||||||
|
|
||||||
|
// Create sink
|
||||||
|
sink = new OpenTelemetryMetricSink(testMeter, enabledMetrics, false, Collections.emptyList());
|
||||||
|
|
||||||
|
// Invoke updateMeasures
|
||||||
|
sink.updateMeasures(Arrays.asList(longGaugeInstrumentEnabled, longGaugeInstrumentDisabled));
|
||||||
|
|
||||||
|
MetricSink.Registration registration = sink.registerBatchCallback(() -> {
|
||||||
|
sink.recordLongGauge(
|
||||||
|
longGaugeInstrumentEnabled, 99, Collections.emptyList(), Collections.emptyList());
|
||||||
|
sink.recordLongGauge(
|
||||||
|
longGaugeInstrumentDisabled, 999, Collections.emptyList(), Collections.emptyList());
|
||||||
|
}, longGaugeInstrumentEnabled, longGaugeInstrumentDisabled);
|
||||||
|
|
||||||
|
assertThat(openTelemetryTesting.getMetrics())
|
||||||
|
.satisfiesExactlyInAnyOrder(
|
||||||
|
metric ->
|
||||||
|
assertThat(metric)
|
||||||
|
.hasInstrumentationScope(InstrumentationScopeInfo.create(
|
||||||
|
OpenTelemetryConstants.INSTRUMENTATION_SCOPE))
|
||||||
|
.hasName("memory")
|
||||||
|
.hasDescription("Amount of memory used")
|
||||||
|
.hasUnit("By")
|
||||||
|
.hasLongGaugeSatisfying(
|
||||||
|
gauge ->
|
||||||
|
gauge.hasPointsSatisfying(
|
||||||
|
point ->
|
||||||
|
point
|
||||||
|
.hasValue(99))));
|
||||||
|
|
||||||
|
// Gauge goes away after close
|
||||||
|
registration.close();
|
||||||
|
assertThat(openTelemetryTesting.getMetrics())
|
||||||
|
.satisfiesExactlyInAnyOrder();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void recordLabels() {
|
public void recordLabels() {
|
||||||
Map<String, Boolean> enabledMetrics = new HashMap<>();
|
Map<String, Boolean> enabledMetrics = new HashMap<>();
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue