Add global::meter_provider_shutdown (#1623)

Co-authored-by: Cijo Thomas <cithomas@microsoft.com>
Co-authored-by: Zhongyang Wu <zhongyang.wu@outlook.com>
This commit is contained in:
Lalit Kumar Bhasin 2024-03-15 18:33:12 -07:00 committed by GitHub
parent a4fae95bd6
commit bae8fb3321
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 129 additions and 47 deletions

View File

@ -1,3 +1,4 @@
use opentelemetry::global;
use opentelemetry::metrics::Unit;
use opentelemetry::Key;
use opentelemetry::{metrics::MeterProvider as _, KeyValue};
@ -7,7 +8,7 @@ use opentelemetry_sdk::metrics::{
use opentelemetry_sdk::{runtime, Resource};
use std::error::Error;
fn init_meter_provider() -> SdkMeterProvider {
fn init_meter_provider() {
// for example 1
let my_view_rename_and_unit = |i: &Instrument| {
if i.name == "my_histogram" {
@ -50,7 +51,7 @@ fn init_meter_provider() -> SdkMeterProvider {
// Ok(serde_json::to_writer_pretty(writer, &data).unwrap()))
.build();
let reader = PeriodicReader::builder(exporter, runtime::Tokio).build();
SdkMeterProvider::builder()
let provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_resource(Resource::new(vec![KeyValue::new(
"service.name",
@ -59,13 +60,14 @@ fn init_meter_provider() -> SdkMeterProvider {
.with_view(my_view_rename_and_unit)
.with_view(my_view_drop_attributes)
.with_view(my_view_change_aggregation)
.build()
.build();
global::set_meter_provider(provider);
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let meter_provider = init_meter_provider();
let meter = meter_provider.meter("mylibraryname");
init_meter_provider();
let meter = global::meter("mylibraryname");
// Example 1 - Rename metric using View.
// This instrument will be renamed to "my_histogram_renamed",
@ -151,6 +153,6 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
// Metrics are exported by default every 30 seconds when using stdout exporter,
// however shutting down the MeterProvider here instantly flushes
// the metrics, instead of waiting for the 30 sec interval.
meter_provider.shutdown()?;
global::shutdown_meter_provider();
Ok(())
}

View File

@ -1,32 +1,34 @@
use opentelemetry::global;
use opentelemetry::metrics::Unit;
use opentelemetry::{metrics::MeterProvider as _, KeyValue};
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
use opentelemetry_sdk::{runtime, Resource};
use std::error::Error;
fn init_meter_provider() -> SdkMeterProvider {
fn init_meter_provider() {
let exporter = opentelemetry_stdout::MetricsExporterBuilder::default()
// uncomment the below lines to pretty print output.
// .with_encoder(|writer, data|
// Ok(serde_json::to_writer_pretty(writer, &data).unwrap()))
.build();
let reader = PeriodicReader::builder(exporter, runtime::Tokio).build();
SdkMeterProvider::builder()
let provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_resource(Resource::new(vec![KeyValue::new(
"service.name",
"metrics-basic-example",
)]))
.build()
.build();
global::set_meter_provider(provider);
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
// Initialize the MeterProvider with the stdout Exporter.
let meter_provider = init_meter_provider();
init_meter_provider();
// Create a meter from the above MeterProvider.
let meter = meter_provider.meter("mylibraryname");
let meter = global::meter("mylibraryname");
// Create a Counter Instrument.
let counter = meter.u64_counter("my_counter").init();
@ -146,6 +148,6 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
// Metrics are exported by default every 30 seconds when using stdout exporter,
// however shutting down the MeterProvider here instantly flushes
// the metrics, instead of waiting for the 30 sec interval.
meter_provider.shutdown()?;
global::shutdown_meter_provider();
Ok(())
}

View File

@ -1,13 +1,13 @@
use once_cell::sync::Lazy;
use opentelemetry::{
global, metrics,
global,
metrics::MetricsError,
trace::{TraceContextExt, TraceError, Tracer},
Key, KeyValue,
};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::logs as sdklogs;
use opentelemetry_sdk::metrics as sdkmetrics;
use opentelemetry_sdk::resource;
use opentelemetry_sdk::trace as sdktrace;
@ -44,19 +44,20 @@ fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
.install_batch(opentelemetry_sdk::runtime::Tokio)
}
fn init_metrics() -> metrics::Result<sdkmetrics::SdkMeterProvider> {
fn init_metrics() -> Result<(), MetricsError> {
let export_config = opentelemetry_otlp::ExportConfig {
endpoint: "http://localhost:4318/v1/metrics".to_string(),
..opentelemetry_otlp::ExportConfig::default()
};
opentelemetry_otlp::new_pipeline()
let provider = opentelemetry_otlp::new_pipeline()
.metrics(opentelemetry_sdk::runtime::Tokio)
.with_exporter(
opentelemetry_otlp::new_exporter()
.http()
.with_export_config(export_config),
)
.build()
.build();
provider.map(|_| ())
}
const LEMONS_KEY: Key = Key::from_static_str("ex.com/lemons");
@ -74,7 +75,7 @@ static COMMON_ATTRIBUTES: Lazy<[KeyValue; 4]> = Lazy::new(|| {
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let _ = init_tracer()?;
let meter_provider = init_metrics()?;
let _ = init_metrics()?;
let _ = init_logs();
let tracer = global::tracer("ex.com/basic");
@ -108,7 +109,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
global::shutdown_tracer_provider();
global::shutdown_logger_provider();
meter_provider.shutdown()?;
global::shutdown_meter_provider();
Ok(())
}

View File

@ -1,18 +1,17 @@
use log::{info, Level};
use once_cell::sync::Lazy;
use opentelemetry::global;
use opentelemetry::global::{logger_provider, shutdown_logger_provider, shutdown_tracer_provider};
use opentelemetry::logs::LogError;
use opentelemetry::metrics::MetricsError;
use opentelemetry::trace::TraceError;
use opentelemetry::{
metrics,
trace::{TraceContextExt, Tracer},
Key, KeyValue,
};
use opentelemetry_appender_log::OpenTelemetryLogBridge;
use opentelemetry_otlp::{ExportConfig, WithExportConfig};
use opentelemetry_sdk::logs::Config;
use opentelemetry_sdk::{metrics::SdkMeterProvider, runtime, trace as sdktrace, Resource};
use opentelemetry_sdk::{runtime, trace as sdktrace, Resource};
use std::error::Error;
fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
@ -32,12 +31,12 @@ fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
.install_batch(runtime::Tokio)
}
fn init_metrics() -> metrics::Result<SdkMeterProvider> {
fn init_metrics() -> Result<(), MetricsError> {
let export_config = ExportConfig {
endpoint: "http://localhost:4317".to_string(),
..ExportConfig::default()
};
opentelemetry_otlp::new_pipeline()
let provider = opentelemetry_otlp::new_pipeline()
.metrics(runtime::Tokio)
.with_exporter(
opentelemetry_otlp::new_exporter()
@ -48,7 +47,11 @@ fn init_metrics() -> metrics::Result<SdkMeterProvider> {
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
"basic-otlp-metrics-example",
)]))
.build()
.build();
match provider {
Ok(_provider) => Ok(()),
Err(err) => Err(err),
}
}
fn init_logs() -> Result<opentelemetry_sdk::logs::Logger, LogError> {
@ -87,13 +90,13 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
// matches the containing block, reporting traces and metrics during the whole
// execution.
let _ = init_tracer()?;
let meter_provider = init_metrics()?;
let _ = init_metrics()?;
// Initialize logs, which sets the global loggerprovider.
let _ = init_logs();
// Retrieve the global LoggerProvider.
let logger_provider = logger_provider();
let logger_provider = global::logger_provider();
// Create a new OpenTelemetryLogBridge using the above LoggerProvider.
let otel_log_appender = OpenTelemetryLogBridge::new(&logger_provider);
@ -137,9 +140,9 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
info!(target: "my-target", "hello from {}. My price is {}", "apple", 1.99);
shutdown_tracer_provider();
shutdown_logger_provider();
meter_provider.shutdown()?;
global::shutdown_tracer_provider();
global::shutdown_logger_provider();
global::shutdown_meter_provider();
Ok(())
}

View File

@ -6,7 +6,9 @@
dependency on crossbeam-channel.
[1612](https://github.com/open-telemetry/opentelemetry-rust/pull/1612/files)
- [#1422](https://github.com/open-telemetry/opentelemetry-rust/pull/1422)
Fix metrics aggregation bug when using Views to drop attributes.
Fix metrics aggregation bug when using Views to drop attributes.
- [#1623](https://github.com/open-telemetry/opentelemetry-rust/pull/1623) Add Drop implementation for SdkMeterProvider, which shuts down
metricreaders, thereby allowing metrics still in memory to be flushed out.
## v0.22.1

View File

@ -9,6 +9,7 @@ use std::{
};
use opentelemetry::{
global,
metrics::{noop::NoopMeterCore, Meter, MeterProvider, MetricsError, Result},
KeyValue,
};
@ -113,6 +114,13 @@ impl SdkMeterProvider {
}
}
impl Drop for SdkMeterProvider {
fn drop(&mut self) {
if let Err(err) = self.shutdown() {
global::handle_error(err);
}
}
}
impl MeterProvider for SdkMeterProvider {
fn versioned_meter(
&self,
@ -211,6 +219,7 @@ impl fmt::Debug for MeterProviderBuilder {
mod tests {
use crate::testing::metrics::metric_reader::TestMetricReader;
use crate::Resource;
use opentelemetry::global;
use opentelemetry::Key;
use opentelemetry::KeyValue;
use std::env;
@ -228,14 +237,14 @@ mod tests {
expect.map(|s| s.to_string())
);
};
let reader = TestMetricReader {};
let reader = TestMetricReader::new();
let default_meter_provider = super::SdkMeterProvider::builder()
.with_reader(reader)
.build();
assert_service_name(default_meter_provider, Some("unknown_service"));
// If user provided a resource, use that.
let reader2 = TestMetricReader {};
let reader2 = TestMetricReader::new();
let custom_meter_provider = super::SdkMeterProvider::builder()
.with_reader(reader2)
.with_resource(Resource::new(vec![KeyValue::new(
@ -250,7 +259,7 @@ mod tests {
Some("key1=value1, k2, k3=value2"),
|| {
// If `OTEL_RESOURCE_ATTRIBUTES` is set, read them automatically
let reader3 = TestMetricReader {};
let reader3 = TestMetricReader::new();
let env_resource_provider = super::SdkMeterProvider::builder()
.with_reader(reader3)
.build();
@ -273,7 +282,7 @@ mod tests {
"OTEL_RESOURCE_ATTRIBUTES",
Some("my-custom-key=env-val,k2=value2"),
|| {
let reader4 = TestMetricReader {};
let reader4 = TestMetricReader::new();
let user_provided_resource_config_provider = super::SdkMeterProvider::builder()
.with_reader(reader4)
.with_resource(Resource::default().merge(&mut Resource::new(vec![
@ -295,7 +304,7 @@ mod tests {
);
// If user provided a resource, it takes priority during collision.
let reader5 = TestMetricReader {};
let reader5 = TestMetricReader::new();
let no_service_name = super::SdkMeterProvider::builder()
.with_reader(reader5)
.with_resource(Resource::empty())
@ -303,4 +312,29 @@ mod tests {
assert_service_name(no_service_name, None);
}
#[test]
fn test_meter_provider_shutdown() {
let reader = TestMetricReader::new();
let provider = super::SdkMeterProvider::builder()
.with_reader(reader.clone())
.build();
global::set_meter_provider(provider.clone());
assert!(!provider
.is_shutdown
.load(std::sync::atomic::Ordering::Relaxed));
assert!(!reader.is_shutdown());
// create a meter and an instrument
let meter = global::meter("test");
let counter = meter.u64_counter("test_counter").init();
// no need to drop a meter for meter_provider shutdown
global::shutdown_meter_provider();
assert!(provider
.is_shutdown
.load(std::sync::atomic::Ordering::Relaxed));
assert!(reader.is_shutdown());
// TODO Fix: the instrument is still available, and can be used.
// While the reader is shutdown, and no collect is happening
counter.add(1, &[]);
}
}

View File

@ -1,4 +1,4 @@
use std::sync::Weak;
use std::sync::{Arc, Mutex, Weak};
use crate::metrics::{
aggregation::Aggregation,
@ -9,8 +9,30 @@ use crate::metrics::{
};
use opentelemetry::metrics::Result;
#[derive(Debug)]
pub struct TestMetricReader {}
#[derive(Debug, Clone)]
pub struct TestMetricReader {
is_shutdown: Arc<Mutex<bool>>,
}
impl TestMetricReader {
// Constructor to initialize the TestMetricReader
pub fn new() -> Self {
TestMetricReader {
is_shutdown: Arc::new(Mutex::new(false)),
}
}
// Method to check if the reader is shutdown
pub fn is_shutdown(&self) -> bool {
*self.is_shutdown.lock().unwrap()
}
}
impl Default for TestMetricReader {
fn default() -> Self {
Self::new()
}
}
impl MetricReader for TestMetricReader {
fn register_pipeline(&self, _pipeline: Weak<Pipeline>) {}
@ -24,7 +46,12 @@ impl MetricReader for TestMetricReader {
}
fn shutdown(&self) -> Result<()> {
self.force_flush()
let result = self.force_flush();
{
let mut is_shutdown = self.is_shutdown.lock().unwrap();
*is_shutdown = true;
}
result
}
}

View File

@ -2,7 +2,7 @@
#[cfg(all(feature = "metrics", feature = "trace"))]
use opentelemetry::{
metrics::MeterProvider as _,
global,
trace::{Span, Tracer, TracerProvider as _},
KeyValue,
};
@ -22,17 +22,18 @@ fn init_trace() -> TracerProvider {
}
#[cfg(all(feature = "metrics", feature = "trace"))]
fn init_metrics() -> SdkMeterProvider {
fn init_metrics() {
let exporter = opentelemetry_stdout::MetricsExporter::default();
let reader = PeriodicReader::builder(exporter, runtime::Tokio).build();
SdkMeterProvider::builder().with_reader(reader).build()
let provider = SdkMeterProvider::builder().with_reader(reader).build();
global::set_meter_provider(provider);
}
#[tokio::main]
#[cfg(all(feature = "metrics", feature = "trace"))]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let tracer_provider = init_trace();
let meter_provider = init_metrics();
init_metrics();
let tracer = tracer_provider.tracer("stdout-test");
let mut span = tracer.start("test_span");
@ -43,11 +44,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
);
span.end();
let meter = meter_provider.meter("stdout-test");
let meter = global::meter("stdout-test");
let c = meter.u64_counter("test_events").init();
c.add(1, &[KeyValue::new("test_key", "test_value")]);
meter_provider.shutdown()?;
global::shutdown_meter_provider();
Ok(())
}

View File

@ -2,7 +2,12 @@
## vNext
### Added
- [#1623](https://github.com/open-telemetry/opentelemetry-rust/pull/1623) Add global::meter_provider_shutdown
### Removed
- Remove `urlencoding` crate dependency. [#1613](https://github.com/open-telemetry/opentelemetry-rust/pull/1613)
## v0.22.0

View File

@ -1,4 +1,4 @@
use crate::metrics::{self, Meter, MeterProvider};
use crate::metrics::{self, noop::NoopMeterProvider, Meter, MeterProvider};
use crate::KeyValue;
use core::fmt;
use once_cell::sync::Lazy;
@ -116,6 +116,11 @@ pub fn meter(name: impl Into<Cow<'static, str>>) -> Meter {
meter_provider().meter(name.into())
}
/// Shut down the current global [`MeterProvider`].
pub fn shutdown_meter_provider() {
set_meter_provider(NoopMeterProvider::new());
}
/// Creates a [`Meter`] with the name, version and schema url.
///
/// - name SHOULD uniquely identify the instrumentation scope, such as the instrumentation library (e.g. io.opentelemetry.contrib.mongodb), package, module or class name.