Stabilize MetricProducer, allow custom MetricReaders (#5835)
This commit is contained in:
parent
6c8f5435db
commit
f421ef1e73
|
@ -1,10 +1,21 @@
|
|||
Comparing source compatibility of against
|
||||
*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.sdk.metrics.export.CollectionRegistration (not serializable)
|
||||
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
|
||||
+++ NEW METHOD: PUBLIC(+) java.util.Collection<io.opentelemetry.sdk.metrics.data.MetricData> collectAllMetrics()
|
||||
+++ NEW METHOD: PUBLIC(+) STATIC(+) io.opentelemetry.sdk.metrics.export.CollectionRegistration noop()
|
||||
*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.sdk.metrics.export.MetricExporter (not serializable)
|
||||
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
|
||||
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.common.export.MemoryMode getMemoryMode()
|
||||
+++ NEW INTERFACE: PUBLIC(+) ABSTRACT(+) io.opentelemetry.sdk.metrics.export.MetricProducer (not serializable)
|
||||
+++ CLASS FILE FORMAT VERSION: 52.0 <- n.a.
|
||||
+++ NEW SUPERCLASS: java.lang.Object
|
||||
+++ NEW METHOD: PUBLIC(+) ABSTRACT(+) java.util.Collection<io.opentelemetry.sdk.metrics.data.MetricData> produce(io.opentelemetry.sdk.resources.Resource)
|
||||
*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.sdk.metrics.export.MetricReader (not serializable)
|
||||
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
|
||||
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.common.export.MemoryMode getMemoryMode()
|
||||
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.metrics.export.PeriodicMetricReader (not serializable)
|
||||
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
|
||||
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.common.export.MemoryMode getMemoryMode()
|
||||
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder (not serializable)
|
||||
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
|
||||
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder registerMetricProducer(io.opentelemetry.sdk.metrics.export.MetricProducer)
|
||||
|
|
|
@ -22,7 +22,6 @@ import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
|
|||
import io.opentelemetry.sdk.metrics.data.MetricData;
|
||||
import io.opentelemetry.sdk.metrics.export.CollectionRegistration;
|
||||
import io.opentelemetry.sdk.metrics.export.MetricReader;
|
||||
import io.opentelemetry.sdk.metrics.internal.export.MetricProducer;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.UncheckedIOException;
|
||||
|
@ -60,7 +59,7 @@ public final class PrometheusHttpServer implements MetricReader {
|
|||
|
||||
private final HttpServer server;
|
||||
private final ExecutorService executor;
|
||||
private volatile MetricProducer metricProducer = MetricProducer.noop();
|
||||
private volatile CollectionRegistration collectionRegistration = CollectionRegistration.noop();
|
||||
|
||||
/**
|
||||
* Returns a new {@link PrometheusHttpServer} which can be registered to an {@link
|
||||
|
@ -83,7 +82,7 @@ public final class PrometheusHttpServer implements MetricReader {
|
|||
throw new UncheckedIOException("Could not create Prometheus HTTP server", e);
|
||||
}
|
||||
MetricsHandler metricsHandler =
|
||||
new MetricsHandler(() -> getMetricProducer().collectAllMetrics());
|
||||
new MetricsHandler(() -> collectionRegistration.collectAllMetrics());
|
||||
server.createContext("/", metricsHandler);
|
||||
server.createContext("/metrics", metricsHandler);
|
||||
server.createContext("/-/healthy", HealthHandler.INSTANCE);
|
||||
|
@ -110,10 +109,6 @@ public final class PrometheusHttpServer implements MetricReader {
|
|||
throw exception;
|
||||
}
|
||||
|
||||
private MetricProducer getMetricProducer() {
|
||||
return metricProducer;
|
||||
}
|
||||
|
||||
private void start() {
|
||||
// server.start must be called from a daemon thread for it to be a daemon.
|
||||
if (Thread.currentThread().isDaemon()) {
|
||||
|
@ -131,13 +126,13 @@ public final class PrometheusHttpServer implements MetricReader {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void register(CollectionRegistration registration) {
|
||||
this.metricProducer = MetricProducer.asMetricProducer(registration);
|
||||
public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) {
|
||||
return AggregationTemporality.CUMULATIVE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) {
|
||||
return AggregationTemporality.CUMULATIVE;
|
||||
public void register(CollectionRegistration registration) {
|
||||
this.collectionRegistration = registration;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -26,17 +26,18 @@ import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
|
|||
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
|
||||
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
|
||||
import io.opentelemetry.sdk.metrics.data.MetricData;
|
||||
import io.opentelemetry.sdk.metrics.export.CollectionRegistration;
|
||||
import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData;
|
||||
import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData;
|
||||
import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData;
|
||||
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
|
||||
import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData;
|
||||
import io.opentelemetry.sdk.metrics.internal.export.MetricProducer;
|
||||
import io.opentelemetry.sdk.resources.Resource;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.ServerSocket;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executors;
|
||||
|
@ -56,7 +57,6 @@ import org.junit.jupiter.params.provider.ValueSource;
|
|||
|
||||
class PrometheusHttpServerTest {
|
||||
private static final AtomicReference<List<MetricData>> metricData = new AtomicReference<>();
|
||||
private static final MetricProducer metricProducer = metricData::get;
|
||||
|
||||
static PrometheusHttpServer prometheusServer;
|
||||
static WebClient client;
|
||||
|
@ -68,7 +68,13 @@ class PrometheusHttpServerTest {
|
|||
static void beforeAll() {
|
||||
// Register the SDK metric producer with the prometheus reader.
|
||||
prometheusServer = PrometheusHttpServer.builder().setHost("localhost").setPort(0).build();
|
||||
prometheusServer.register(metricProducer);
|
||||
prometheusServer.register(
|
||||
new CollectionRegistration() {
|
||||
@Override
|
||||
public Collection<MetricData> collectAllMetrics() {
|
||||
return metricData.get();
|
||||
}
|
||||
});
|
||||
|
||||
client =
|
||||
WebClient.builder("http://localhost:" + prometheusServer.getAddress().getPort())
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.opencensusshim;
|
||||
|
||||
import io.opencensus.metrics.Metrics;
|
||||
import io.opencensus.metrics.export.MetricProducerManager;
|
||||
import io.opentelemetry.opencensusshim.internal.metrics.MetricAdapter;
|
||||
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
|
||||
import io.opentelemetry.sdk.metrics.data.MetricData;
|
||||
import io.opentelemetry.sdk.metrics.export.MetricProducer;
|
||||
import io.opentelemetry.sdk.metrics.export.MetricReader;
|
||||
import io.opentelemetry.sdk.resources.Resource;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* {@link MetricProducer} for OpenCensus metrics, which allows {@link MetricReader}s to read from
|
||||
* both OpenTelemetry and OpenCensus metrics.
|
||||
*
|
||||
* <p>To use, register with {@link SdkMeterProviderBuilder#registerMetricProducer(MetricProducer)}.
|
||||
*/
|
||||
public final class OpenCensusMetricProducer implements MetricProducer {
|
||||
private final MetricProducerManager openCensusMetricStorage;
|
||||
|
||||
OpenCensusMetricProducer(MetricProducerManager openCensusMetricStorage) {
|
||||
this.openCensusMetricStorage = openCensusMetricStorage;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a new {@link OpenCensusMetricProducer} that reports against the given {@link
|
||||
* Resource}.
|
||||
*/
|
||||
public static MetricProducer create() {
|
||||
return new OpenCensusMetricProducer(Metrics.getExportComponent().getMetricProducerManager());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<MetricData> produce(Resource resource) {
|
||||
List<MetricData> result = new ArrayList<>();
|
||||
openCensusMetricStorage
|
||||
.getAllMetricProducer()
|
||||
.forEach(
|
||||
producer ->
|
||||
producer
|
||||
.getMetrics()
|
||||
.forEach(metric -> result.add(MetricAdapter.convert(resource, metric))));
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -1,30 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.opencensusshim.metrics;
|
||||
|
||||
import io.opentelemetry.sdk.metrics.data.MetricData;
|
||||
import io.opentelemetry.sdk.metrics.internal.export.MetricProducer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
/** Class that wraps multiple metric producers into one. */
|
||||
final class MultiMetricProducer implements MetricProducer {
|
||||
private final Collection<MetricProducer> producers;
|
||||
|
||||
public MultiMetricProducer(Collection<MetricProducer> producers) {
|
||||
this.producers = producers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<MetricData> collectAllMetrics() {
|
||||
List<MetricData> result = new ArrayList<>();
|
||||
for (MetricProducer p : producers) {
|
||||
result.addAll(p.collectAllMetrics());
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -1,49 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.opencensusshim.metrics;
|
||||
|
||||
import io.opentelemetry.sdk.common.CompletableResultCode;
|
||||
import io.opentelemetry.sdk.metrics.InstrumentType;
|
||||
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
|
||||
import io.opentelemetry.sdk.metrics.export.CollectionRegistration;
|
||||
import io.opentelemetry.sdk.metrics.export.MetricReader;
|
||||
import io.opentelemetry.sdk.metrics.internal.export.MetricProducer;
|
||||
import io.opentelemetry.sdk.resources.Resource;
|
||||
import java.util.Arrays;
|
||||
|
||||
/** {@link MetricReader} that appends OpenCensus metrics to anything read. */
|
||||
final class OpenCensusAttachingMetricReader implements MetricReader {
|
||||
private final MetricReader adapted;
|
||||
|
||||
OpenCensusAttachingMetricReader(MetricReader adapted) {
|
||||
this.adapted = adapted;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void register(CollectionRegistration registration) {
|
||||
// TODO: Find a way to pull the resource off of the SDK.
|
||||
adapted.register(
|
||||
new MultiMetricProducer(
|
||||
Arrays.asList(
|
||||
MetricProducer.asMetricProducer(registration),
|
||||
OpenCensusMetricProducer.create(Resource.getDefault()))));
|
||||
}
|
||||
|
||||
@Override
|
||||
public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) {
|
||||
return adapted.getAggregationTemporality(instrumentType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableResultCode forceFlush() {
|
||||
return adapted.forceFlush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableResultCode shutdown() {
|
||||
return adapted.shutdown();
|
||||
}
|
||||
}
|
|
@ -1,58 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.opencensusshim.metrics;
|
||||
|
||||
import io.opencensus.metrics.Metrics;
|
||||
import io.opencensus.metrics.export.MetricProducerManager;
|
||||
import io.opentelemetry.opencensusshim.internal.metrics.MetricAdapter;
|
||||
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
|
||||
import io.opentelemetry.sdk.metrics.data.MetricData;
|
||||
import io.opentelemetry.sdk.metrics.export.MetricReader;
|
||||
import io.opentelemetry.sdk.metrics.internal.export.MetricProducer;
|
||||
import io.opentelemetry.sdk.resources.Resource;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* A producer instance of OpenCensus metrics.
|
||||
*
|
||||
* <p>The idea here is we can register a merged {@link MetricProducer} combining this with the
|
||||
* {@link SdkMeterProvider} producer with a {@link MetricReader}, allowing the reader to pull
|
||||
* metrics from both OpenTelemetry and OpenCensus backends.
|
||||
*/
|
||||
final class OpenCensusMetricProducer implements MetricProducer {
|
||||
private final Resource resource;
|
||||
private final MetricProducerManager openCensusMetricStorage;
|
||||
|
||||
OpenCensusMetricProducer(Resource resource, MetricProducerManager openCensusMetricStorage) {
|
||||
this.resource = resource;
|
||||
this.openCensusMetricStorage = openCensusMetricStorage;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a new {@link OpenCensusMetricProducer} that reports against the given {@link
|
||||
* Resource}.
|
||||
*/
|
||||
static MetricProducer create(Resource resource) {
|
||||
return new OpenCensusMetricProducer(
|
||||
resource, Metrics.getExportComponent().getMetricProducerManager());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<MetricData> collectAllMetrics() {
|
||||
List<MetricData> result = new ArrayList<>();
|
||||
openCensusMetricStorage
|
||||
.getAllMetricProducer()
|
||||
.forEach(
|
||||
producer -> {
|
||||
producer
|
||||
.getMetrics()
|
||||
.forEach(metric -> result.add(MetricAdapter.convert(resource, metric)));
|
||||
});
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -1,23 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.opencensusshim.metrics;
|
||||
|
||||
import io.opentelemetry.sdk.metrics.export.MetricReader;
|
||||
|
||||
/** Convenience methods for adapting OpenCensus metrics into OpenTelemetry. */
|
||||
public final class OpenCensusMetrics {
|
||||
private OpenCensusMetrics() {}
|
||||
|
||||
/**
|
||||
* Attaches OpenCensus metrics to metrics read by the given input.
|
||||
*
|
||||
* @param input A {@link MetricReader} that will receive OpenCensus metrics.
|
||||
* @return The adapted MetricReaderFactory.
|
||||
*/
|
||||
public static MetricReader attachTo(MetricReader input) {
|
||||
return new OpenCensusAttachingMetricReader(input);
|
||||
}
|
||||
}
|
|
@ -21,7 +21,8 @@ import io.opencensus.trace.TraceId;
|
|||
import io.opencensus.trace.TraceOptions;
|
||||
import io.opencensus.trace.Tracestate;
|
||||
import io.opentelemetry.api.common.Attributes;
|
||||
import io.opentelemetry.sdk.metrics.internal.export.MetricProducer;
|
||||
import io.opentelemetry.opencensusshim.OpenCensusMetricProducer;
|
||||
import io.opentelemetry.sdk.metrics.export.MetricProducer;
|
||||
import io.opentelemetry.sdk.resources.Resource;
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
|
@ -31,8 +32,7 @@ import org.awaitility.Awaitility;
|
|||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class OpenCensusMetricProducerTest {
|
||||
private final MetricProducer openCensusMetrics =
|
||||
OpenCensusMetricProducer.create(Resource.empty());
|
||||
private final MetricProducer openCensusMetrics = OpenCensusMetricProducer.create();
|
||||
|
||||
private static final Measure.MeasureLong LATENCY_MS =
|
||||
Measure.MeasureLong.create("task_latency", "The task latency in milliseconds", "ms");
|
||||
|
@ -69,7 +69,7 @@ class OpenCensusMetricProducerTest {
|
|||
.atMost(Duration.ofSeconds(10))
|
||||
.untilAsserted(
|
||||
() ->
|
||||
assertThat(openCensusMetrics.collectAllMetrics())
|
||||
assertThat(openCensusMetrics.produce(Resource.empty()))
|
||||
.satisfiesExactly(
|
||||
metric ->
|
||||
assertThat(metric)
|
||||
|
|
|
@ -12,6 +12,7 @@ import io.opencensus.stats.Measure;
|
|||
import io.opencensus.stats.Stats;
|
||||
import io.opencensus.stats.StatsRecorder;
|
||||
import io.opencensus.stats.View;
|
||||
import io.opentelemetry.opencensusshim.OpenCensusMetricProducer;
|
||||
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
|
||||
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
|
||||
import java.time.Duration;
|
||||
|
@ -26,7 +27,10 @@ class OpenCensusMetricsTest {
|
|||
void capturesOpenCensusAndOtelMetrics() throws InterruptedException {
|
||||
InMemoryMetricReader reader = InMemoryMetricReader.create();
|
||||
SdkMeterProvider otelMetrics =
|
||||
SdkMeterProvider.builder().registerMetricReader(OpenCensusMetrics.attachTo(reader)).build();
|
||||
SdkMeterProvider.builder()
|
||||
.registerMetricReader(reader)
|
||||
.registerMetricProducer(OpenCensusMetricProducer.create())
|
||||
.build();
|
||||
// Record an otel metric.
|
||||
otelMetrics.meterBuilder("otel").build().counterBuilder("otel.sum").build().add(1);
|
||||
// Record an OpenCensus metric.
|
||||
|
@ -47,7 +51,7 @@ class OpenCensusMetricsTest {
|
|||
.untilAsserted(
|
||||
() ->
|
||||
assertThat(reader.collectAllMetrics())
|
||||
.satisfiesExactly(
|
||||
.satisfiesExactlyInAnyOrder(
|
||||
metric ->
|
||||
assertThat(metric).hasName("otel.sum").hasLongSumSatisfying(sum -> {}),
|
||||
metric ->
|
||||
|
|
|
@ -421,6 +421,7 @@ class OpenTelemetrySdkTest {
|
|||
+ "clock=SystemClock{}, "
|
||||
+ "resource=Resource{schemaUrl=null, attributes={service.name=\"otel-test\"}}, "
|
||||
+ "metricReaders=[PeriodicMetricReader{exporter=MockMetricExporter{}, intervalNanos=60000000000}], "
|
||||
+ "metricProducers=[], "
|
||||
+ "views=[RegisteredView{instrumentSelector=InstrumentSelector{instrumentName=instrument}, view=View{name=new-instrument, aggregation=DefaultAggregation, attributesProcessor=NoopAttributesProcessor{}, cardinalityLimit=2000}}]"
|
||||
+ "}, "
|
||||
+ "loggerProvider=SdkLoggerProvider{"
|
||||
|
|
|
@ -13,11 +13,12 @@ import io.opentelemetry.sdk.common.Clock;
|
|||
import io.opentelemetry.sdk.common.CompletableResultCode;
|
||||
import io.opentelemetry.sdk.internal.ComponentRegistry;
|
||||
import io.opentelemetry.sdk.metrics.data.MetricData;
|
||||
import io.opentelemetry.sdk.metrics.export.CollectionRegistration;
|
||||
import io.opentelemetry.sdk.metrics.export.MetricProducer;
|
||||
import io.opentelemetry.sdk.metrics.export.MetricReader;
|
||||
import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
|
||||
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter;
|
||||
import io.opentelemetry.sdk.metrics.internal.export.CardinalityLimitSelector;
|
||||
import io.opentelemetry.sdk.metrics.internal.export.MetricProducer;
|
||||
import io.opentelemetry.sdk.metrics.internal.export.RegisteredReader;
|
||||
import io.opentelemetry.sdk.metrics.internal.state.MeterProviderSharedState;
|
||||
import io.opentelemetry.sdk.metrics.internal.view.RegisteredView;
|
||||
|
@ -45,6 +46,7 @@ public final class SdkMeterProvider implements MeterProvider, Closeable {
|
|||
|
||||
private final List<RegisteredView> registeredViews;
|
||||
private final List<RegisteredReader> registeredReaders;
|
||||
private final List<MetricProducer> metricProducers;
|
||||
private final MeterProviderSharedState sharedState;
|
||||
private final ComponentRegistry<SdkMeter> registry;
|
||||
private final AtomicBoolean isClosed = new AtomicBoolean(false);
|
||||
|
@ -57,6 +59,7 @@ public final class SdkMeterProvider implements MeterProvider, Closeable {
|
|||
SdkMeterProvider(
|
||||
List<RegisteredView> registeredViews,
|
||||
IdentityHashMap<MetricReader, CardinalityLimitSelector> metricReaders,
|
||||
List<MetricProducer> metricProducers,
|
||||
Clock clock,
|
||||
Resource resource,
|
||||
ExemplarFilter exemplarFilter) {
|
||||
|
@ -70,6 +73,7 @@ public final class SdkMeterProvider implements MeterProvider, Closeable {
|
|||
entry.getKey(),
|
||||
ViewRegistry.create(entry.getKey(), entry.getValue(), registeredViews)))
|
||||
.collect(toList());
|
||||
this.metricProducers = metricProducers;
|
||||
this.sharedState =
|
||||
MeterProviderSharedState.create(clock, resource, exemplarFilter, startEpochNanos);
|
||||
this.registry =
|
||||
|
@ -77,8 +81,11 @@ public final class SdkMeterProvider implements MeterProvider, Closeable {
|
|||
instrumentationLibraryInfo ->
|
||||
new SdkMeter(sharedState, instrumentationLibraryInfo, registeredReaders));
|
||||
for (RegisteredReader registeredReader : registeredReaders) {
|
||||
MetricProducer producer = new LeasedMetricProducer(registry, sharedState, registeredReader);
|
||||
registeredReader.getReader().register(producer);
|
||||
List<MetricProducer> readerMetricProducers = new ArrayList<>(metricProducers);
|
||||
readerMetricProducers.add(new LeasedMetricProducer(registry, sharedState, registeredReader));
|
||||
registeredReader
|
||||
.getReader()
|
||||
.register(new SdkCollectionRegistration(readerMetricProducers, sharedState));
|
||||
registeredReader.setLastCollectEpochNanos(startEpochNanos);
|
||||
}
|
||||
}
|
||||
|
@ -154,6 +161,8 @@ public final class SdkMeterProvider implements MeterProvider, Closeable {
|
|||
+ sharedState.getResource()
|
||||
+ ", metricReaders="
|
||||
+ registeredReaders.stream().map(RegisteredReader::getReader).collect(toList())
|
||||
+ ", metricProducers="
|
||||
+ metricProducers
|
||||
+ ", views="
|
||||
+ registeredViews
|
||||
+ "}";
|
||||
|
@ -176,7 +185,7 @@ public final class SdkMeterProvider implements MeterProvider, Closeable {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Collection<MetricData> collectAllMetrics() {
|
||||
public Collection<MetricData> produce(Resource unused) {
|
||||
Collection<SdkMeter> meters = registry.getComponents();
|
||||
List<MetricData> result = new ArrayList<>();
|
||||
long collectTime = sharedState.getClock().now();
|
||||
|
@ -187,4 +196,31 @@ public final class SdkMeterProvider implements MeterProvider, Closeable {
|
|||
return Collections.unmodifiableCollection(result);
|
||||
}
|
||||
}
|
||||
|
||||
private static class SdkCollectionRegistration implements CollectionRegistration {
|
||||
private final List<MetricProducer> metricProducers;
|
||||
private final MeterProviderSharedState sharedState;
|
||||
|
||||
private SdkCollectionRegistration(
|
||||
List<MetricProducer> metricProducers, MeterProviderSharedState sharedState) {
|
||||
this.metricProducers = metricProducers;
|
||||
this.sharedState = sharedState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<MetricData> collectAllMetrics() {
|
||||
if (metricProducers.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
Resource resource = sharedState.getResource();
|
||||
if (metricProducers.size() == 1) {
|
||||
return metricProducers.get(0).produce(resource);
|
||||
}
|
||||
List<MetricData> metricData = new ArrayList<>();
|
||||
for (MetricProducer metricProducer : metricProducers) {
|
||||
metricData.addAll(metricProducer.produce(resource));
|
||||
}
|
||||
return Collections.unmodifiableList(metricData);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
package io.opentelemetry.sdk.metrics;
|
||||
|
||||
import io.opentelemetry.sdk.common.Clock;
|
||||
import io.opentelemetry.sdk.metrics.export.MetricProducer;
|
||||
import io.opentelemetry.sdk.metrics.export.MetricReader;
|
||||
import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
|
||||
import io.opentelemetry.sdk.metrics.internal.debug.SourceInfo;
|
||||
|
@ -36,6 +37,7 @@ public final class SdkMeterProviderBuilder {
|
|||
private Resource resource = Resource.getDefault();
|
||||
private final IdentityHashMap<MetricReader, CardinalityLimitSelector> metricReaders =
|
||||
new IdentityHashMap<>();
|
||||
private final List<MetricProducer> metricProducers = new ArrayList<>();
|
||||
private final List<RegisteredView> registeredViews = new ArrayList<>();
|
||||
private ExemplarFilter exemplarFilter = DEFAULT_EXEMPLAR_FILTER;
|
||||
|
||||
|
@ -119,11 +121,7 @@ public final class SdkMeterProviderBuilder {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a {@link MetricReader}.
|
||||
*
|
||||
* <p>Note: custom implementations of {@link MetricReader} are not currently supported.
|
||||
*/
|
||||
/** Registers a {@link MetricReader}. */
|
||||
public SdkMeterProviderBuilder registerMetricReader(MetricReader reader) {
|
||||
metricReaders.put(reader, CardinalityLimitSelector.defaultCardinalityLimitSelector());
|
||||
return this;
|
||||
|
@ -142,8 +140,19 @@ public final class SdkMeterProviderBuilder {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a {@link MetricProducer}.
|
||||
*
|
||||
* @since 1.31.0
|
||||
*/
|
||||
public SdkMeterProviderBuilder registerMetricProducer(MetricProducer metricProducer) {
|
||||
metricProducers.add(metricProducer);
|
||||
return this;
|
||||
}
|
||||
|
||||
/** Returns an {@link SdkMeterProvider} built with the configuration of this builder. */
|
||||
public SdkMeterProvider build() {
|
||||
return new SdkMeterProvider(registeredViews, metricReaders, clock, resource, exemplarFilter);
|
||||
return new SdkMeterProvider(
|
||||
registeredViews, metricReaders, metricProducers, clock, resource, exemplarFilter);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,7 +5,11 @@
|
|||
|
||||
package io.opentelemetry.sdk.metrics.export;
|
||||
|
||||
import io.opentelemetry.sdk.common.export.MemoryMode;
|
||||
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
|
||||
import io.opentelemetry.sdk.metrics.data.MetricData;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
||||
/**
|
||||
* A {@link CollectionRegistration} is passed to each {@link MetricReader} registered with {@link
|
||||
|
@ -13,5 +17,29 @@ import io.opentelemetry.sdk.metrics.SdkMeterProvider;
|
|||
*
|
||||
* @since 1.14.0
|
||||
*/
|
||||
// TODO(jack-berg): Have methods when custom MetricReaders are supported
|
||||
public interface CollectionRegistration {}
|
||||
public interface CollectionRegistration {
|
||||
|
||||
/**
|
||||
* Returns a noop {@link CollectionRegistration}, useful for {@link MetricReader}s to hold before
|
||||
* {@link MetricReader#register(CollectionRegistration)} is called.
|
||||
*/
|
||||
static CollectionRegistration noop() {
|
||||
return new CollectionRegistration() {
|
||||
@Override
|
||||
public Collection<MetricData> collectAllMetrics() {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Collect all metrics, including metrics from the SDK and any registered {@link MetricProducer}s.
|
||||
*
|
||||
* <p>If {@link MetricReader#getMemoryMode()} is configured to {@link MemoryMode#REUSABLE_DATA} do
|
||||
* not keep the result or any of its contained objects as they are to be reused to return the
|
||||
* result for the next call to this method.
|
||||
*/
|
||||
default Collection<MetricData> collectAllMetrics() {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.sdk.metrics.export;
|
||||
|
||||
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
|
||||
import io.opentelemetry.sdk.metrics.data.MetricData;
|
||||
import io.opentelemetry.sdk.resources.Resource;
|
||||
import java.util.Collection;
|
||||
import javax.annotation.concurrent.ThreadSafe;
|
||||
|
||||
/**
|
||||
* {@link MetricProducer} is the interface that is used to make metric data available to the {@link
|
||||
* MetricReader}s. The primary implementation is provided by {@link
|
||||
* io.opentelemetry.sdk.metrics.SdkMeterProvider}.
|
||||
*
|
||||
* <p>Alternative {@link MetricProducer} implementations can be used to bridge aggregated metrics
|
||||
* from other frameworks, and are registered with {@link
|
||||
* SdkMeterProviderBuilder#registerMetricProducer(MetricProducer)}. NOTE: When possible, metrics
|
||||
* from other frameworks SHOULD be bridged using the metric API, normally with asynchronous
|
||||
* instruments which observe the aggregated state of the other framework. However, {@link
|
||||
* MetricProducer} exists to accommodate scenarios where the metric API is insufficient. It should
|
||||
* be used with caution as it requires the bridge to take a dependency on {@code
|
||||
* opentelemetry-sdk-metrics}, which is generally not advised.
|
||||
*
|
||||
* <p>Implementations must be thread-safe.
|
||||
*
|
||||
* @since 1.31.0
|
||||
*/
|
||||
@ThreadSafe
|
||||
public interface MetricProducer {
|
||||
|
||||
/**
|
||||
* Returns a collection of produced {@link MetricData}s to be exported. This will only be those
|
||||
* metrics that have been produced since the last time this method was called.
|
||||
*
|
||||
* @return a collection of produced {@link MetricData}s to be exported.
|
||||
*/
|
||||
Collection<MetricData> produce(Resource resource);
|
||||
}
|
|
@ -19,20 +19,15 @@ import java.util.concurrent.TimeUnit;
|
|||
/**
|
||||
* A metric reader reads metrics from an {@link SdkMeterProvider}.
|
||||
*
|
||||
* <p>Custom implementations of {@link MetricReader} are not currently supported. Please use one of
|
||||
* the built-in readers such as {@link PeriodicMetricReader}.
|
||||
*
|
||||
* @since 1.14.0
|
||||
*/
|
||||
public interface MetricReader
|
||||
extends AggregationTemporalitySelector, DefaultAggregationSelector, Closeable {
|
||||
|
||||
/**
|
||||
* Called by {@link SdkMeterProvider} and supplies the {@link MetricReader} with a handle to
|
||||
* collect metrics.
|
||||
*
|
||||
* <p>{@link CollectionRegistration} is currently an empty interface because custom
|
||||
* implementations of {@link MetricReader} are not currently supported.
|
||||
* Called by {@link SdkMeterProvider} on initialization to supply the {@link MetricReader} with
|
||||
* {@link MetricProducer}s used to collect metrics. {@link MetricReader} implementations call
|
||||
* {@link CollectionRegistration#collectAllMetrics()} to read metrics.
|
||||
*/
|
||||
void register(CollectionRegistration registration);
|
||||
|
||||
|
|
|
@ -13,7 +13,6 @@ import io.opentelemetry.sdk.metrics.SdkMeterProvider;
|
|||
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
|
||||
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
|
||||
import io.opentelemetry.sdk.metrics.data.MetricData;
|
||||
import io.opentelemetry.sdk.metrics.internal.export.MetricProducer;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
|
@ -40,8 +39,8 @@ public final class PeriodicMetricReader implements MetricReader {
|
|||
private final ScheduledExecutorService scheduler;
|
||||
private final Scheduled scheduled;
|
||||
private final Object lock = new Object();
|
||||
private volatile CollectionRegistration collectionRegistration = CollectionRegistration.noop();
|
||||
|
||||
private volatile MetricProducer metricProducer = MetricProducer.noop();
|
||||
@Nullable private volatile ScheduledFuture<?> scheduledFuture;
|
||||
|
||||
/**
|
||||
|
@ -117,8 +116,8 @@ public final class PeriodicMetricReader implements MetricReader {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void register(CollectionRegistration registration) {
|
||||
this.metricProducer = MetricProducer.asMetricProducer(registration);
|
||||
public void register(CollectionRegistration collectionRegistration) {
|
||||
this.collectionRegistration = collectionRegistration;
|
||||
start();
|
||||
}
|
||||
|
||||
|
@ -159,7 +158,7 @@ public final class PeriodicMetricReader implements MetricReader {
|
|||
CompletableResultCode flushResult = new CompletableResultCode();
|
||||
if (exportAvailable.compareAndSet(true, false)) {
|
||||
try {
|
||||
Collection<MetricData> metricData = metricProducer.collectAllMetrics();
|
||||
Collection<MetricData> metricData = collectionRegistration.collectAllMetrics();
|
||||
if (metricData.isEmpty()) {
|
||||
logger.log(Level.FINE, "No metric data to export - skipping export.");
|
||||
flushResult.succeed();
|
||||
|
|
|
@ -1,54 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.sdk.metrics.internal.export;
|
||||
|
||||
import io.opentelemetry.sdk.common.export.MemoryMode;
|
||||
import io.opentelemetry.sdk.metrics.data.MetricData;
|
||||
import io.opentelemetry.sdk.metrics.export.CollectionRegistration;
|
||||
import io.opentelemetry.sdk.metrics.export.MetricReader;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import javax.annotation.concurrent.ThreadSafe;
|
||||
|
||||
/**
|
||||
* {@code MetricProducer} is the interface that is used to make metric data available to the {@link
|
||||
* MetricReader}s. Implementations should be stateful, in that each call to {@link
|
||||
* #collectAllMetrics()} will return any metric generated since the last call was made.
|
||||
*
|
||||
* <p>Implementations must be thread-safe.
|
||||
*
|
||||
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
|
||||
* at any time.
|
||||
*/
|
||||
@ThreadSafe
|
||||
public interface MetricProducer extends CollectionRegistration {
|
||||
|
||||
/** Cast the registration to a {@link MetricProducer}. */
|
||||
static MetricProducer asMetricProducer(CollectionRegistration registration) {
|
||||
if (!(registration instanceof MetricProducer)) {
|
||||
throw new IllegalArgumentException(
|
||||
"unrecognized CollectionRegistration, custom MetricReader implementations are not currently supported");
|
||||
}
|
||||
return (MetricProducer) registration;
|
||||
}
|
||||
|
||||
/** Return a noop {@link MetricProducer}. */
|
||||
static MetricProducer noop() {
|
||||
return Collections::emptyList;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a collection of produced {@link MetricData}s to be exported. This will only be those
|
||||
* metrics that have been produced since the last time this method was called.
|
||||
*
|
||||
* <p>If {@link MetricReader#getMemoryMode()} is configured to {@link MemoryMode#REUSABLE_DATA} do
|
||||
* not keep the result or any of its contained objects as they are to be reused to return the
|
||||
* result for the next call of {@code collectAllMetrics}
|
||||
*
|
||||
* @return a collection of produced {@link MetricData}s to be exported.
|
||||
*/
|
||||
Collection<MetricData> collectAllMetrics();
|
||||
}
|
|
@ -8,6 +8,7 @@ package io.opentelemetry.sdk.metrics.internal.export;
|
|||
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
|
||||
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
|
||||
import io.opentelemetry.sdk.metrics.data.PointData;
|
||||
import io.opentelemetry.sdk.metrics.export.MetricProducer;
|
||||
import io.opentelemetry.sdk.metrics.export.MetricReader;
|
||||
import io.opentelemetry.sdk.metrics.internal.view.ViewRegistry;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
|
|
@ -26,7 +26,6 @@ import io.opentelemetry.sdk.metrics.data.MetricData;
|
|||
import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData;
|
||||
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
|
||||
import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData;
|
||||
import io.opentelemetry.sdk.metrics.internal.export.MetricProducer;
|
||||
import io.opentelemetry.sdk.resources.Resource;
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
|
@ -67,17 +66,19 @@ class PeriodicMetricReaderTest {
|
|||
ImmutableSumData.create(
|
||||
/* isMonotonic= */ true, AggregationTemporality.CUMULATIVE, LONG_POINT_LIST));
|
||||
|
||||
@Mock private MetricProducer metricProducer;
|
||||
@Mock private CollectionRegistration collectionRegistration;
|
||||
@Mock private MetricExporter metricExporter;
|
||||
|
||||
@BeforeEach
|
||||
void setup() {
|
||||
when(metricProducer.collectAllMetrics()).thenReturn(Collections.singletonList(METRIC_DATA));
|
||||
when(collectionRegistration.collectAllMetrics())
|
||||
.thenReturn(Collections.singletonList(METRIC_DATA));
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
void startOnlyOnce() {
|
||||
|
||||
ScheduledExecutorService scheduler = mock(ScheduledExecutorService.class);
|
||||
|
||||
ScheduledFuture mock = mock(ScheduledFuture.class);
|
||||
|
@ -89,7 +90,7 @@ class PeriodicMetricReaderTest {
|
|||
.setExecutor(scheduler)
|
||||
.build();
|
||||
|
||||
reader.register(metricProducer);
|
||||
reader.register(collectionRegistration);
|
||||
|
||||
verify(scheduler, times(1)).scheduleAtFixedRate(any(), anyLong(), anyLong(), any());
|
||||
}
|
||||
|
@ -102,7 +103,7 @@ class PeriodicMetricReaderTest {
|
|||
.setInterval(Duration.ofMillis(100))
|
||||
.build();
|
||||
|
||||
reader.register(metricProducer);
|
||||
reader.register(collectionRegistration);
|
||||
try {
|
||||
assertThat(waitingMetricExporter.waitForNumberOfExports(1))
|
||||
.containsExactly(Collections.singletonList(METRIC_DATA));
|
||||
|
@ -122,12 +123,12 @@ class PeriodicMetricReaderTest {
|
|||
PeriodicMetricReader.builder(waitingMetricExporter)
|
||||
.setInterval(Duration.ofMillis(100))
|
||||
.build();
|
||||
when(metricProducer.collectAllMetrics()).thenReturn(Collections.emptyList());
|
||||
reader.register(metricProducer);
|
||||
when(collectionRegistration.collectAllMetrics()).thenReturn(Collections.emptyList());
|
||||
reader.register(collectionRegistration);
|
||||
|
||||
try {
|
||||
assertThat(reader.forceFlush().join(10, TimeUnit.SECONDS).isSuccess()).isTrue();
|
||||
verify(metricProducer).collectAllMetrics();
|
||||
verify(collectionRegistration).collectAllMetrics();
|
||||
assertThat(waitingMetricExporter.exportTimes.size()).isEqualTo(0);
|
||||
} finally {
|
||||
reader.shutdown();
|
||||
|
@ -142,7 +143,7 @@ class PeriodicMetricReaderTest {
|
|||
.setInterval(Duration.ofNanos(Long.MAX_VALUE))
|
||||
.build();
|
||||
|
||||
reader.register(metricProducer);
|
||||
reader.register(collectionRegistration);
|
||||
assertThat(reader.forceFlush().join(10, TimeUnit.SECONDS).isSuccess()).isTrue();
|
||||
|
||||
try {
|
||||
|
@ -164,7 +165,7 @@ class PeriodicMetricReaderTest {
|
|||
.setInterval(Duration.ofMillis(100))
|
||||
.build();
|
||||
|
||||
reader.register(metricProducer);
|
||||
reader.register(collectionRegistration);
|
||||
try {
|
||||
assertThat(waitingMetricExporter.waitForNumberOfExports(2))
|
||||
.containsExactly(
|
||||
|
@ -181,7 +182,7 @@ class PeriodicMetricReaderTest {
|
|||
PeriodicMetricReader.builder(waitingMetricExporter)
|
||||
.setInterval(Duration.ofSeconds(Integer.MAX_VALUE))
|
||||
.build();
|
||||
reader.register(metricProducer);
|
||||
reader.register(collectionRegistration);
|
||||
reader.shutdown();
|
||||
|
||||
// This export was called during shutdown.
|
||||
|
@ -198,7 +199,7 @@ class PeriodicMetricReaderTest {
|
|||
PeriodicMetricReader.builder(new WaitingMetricExporter())
|
||||
.setInterval(Duration.ofSeconds(Integer.MAX_VALUE))
|
||||
.build());
|
||||
reader.register(metricProducer);
|
||||
reader.register(collectionRegistration);
|
||||
reader.close();
|
||||
|
||||
verify(reader, times(1)).shutdown();
|
||||
|
|
|
@ -17,7 +17,6 @@ import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector;
|
|||
import io.opentelemetry.sdk.metrics.export.CollectionRegistration;
|
||||
import io.opentelemetry.sdk.metrics.export.DefaultAggregationSelector;
|
||||
import io.opentelemetry.sdk.metrics.export.MetricReader;
|
||||
import io.opentelemetry.sdk.metrics.internal.export.MetricProducer;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -54,7 +53,7 @@ public class InMemoryMetricReader implements MetricReader {
|
|||
private final AggregationTemporalitySelector aggregationTemporalitySelector;
|
||||
private final DefaultAggregationSelector defaultAggregationSelector;
|
||||
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
|
||||
private volatile MetricProducer metricProducer = MetricProducer.noop();
|
||||
private volatile CollectionRegistration collectionRegistration = CollectionRegistration.noop();
|
||||
private final MemoryMode memoryMode;
|
||||
|
||||
/**
|
||||
|
@ -111,12 +110,12 @@ public class InMemoryMetricReader implements MetricReader {
|
|||
if (isShutdown.get()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
return metricProducer.collectAllMetrics();
|
||||
return collectionRegistration.collectAllMetrics();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void register(CollectionRegistration registration) {
|
||||
this.metricProducer = MetricProducer.asMetricProducer(registration);
|
||||
public void register(CollectionRegistration collectionRegistration) {
|
||||
this.collectionRegistration = collectionRegistration;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue