chore(metrics): use kubert-prometheus-process metrics (#3742)

kubert-prometheus-process is a new crate that includes all of Linkerd's system
metrics and more. This also helps avoid annoying compilation build issues on
non-Linux systems.
This commit is contained in:
Oliver Gould 2025-03-11 15:37:17 -07:00 committed by GitHub
parent 575ba0008b
commit 46babf0849
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 24 additions and 344 deletions

View File

@ -1231,6 +1231,18 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "kubert-prometheus-process"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b89e2a641a3f74c2e7366eb050282ac4a6194b63dae5294084215c457237e47"
dependencies = [
"libc",
"procfs",
"prometheus-client",
"tracing",
]
[[package]]
name = "kubert-prometheus-tokio"
version = "0.2.0"
@ -1272,7 +1284,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34"
dependencies = [
"cfg-if",
"windows-targets 0.52.0",
"windows-targets 0.48.5",
]
[[package]]
@ -2029,9 +2041,9 @@ dependencies = [
"http",
"http-body",
"hyper",
"kubert-prometheus-process",
"linkerd-http-box",
"linkerd-stack",
"linkerd-system",
"parking_lot",
"prometheus-client",
"quickcheck",
@ -2549,15 +2561,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "linkerd-system"
version = "0.1.0"
dependencies = [
"libc",
"procfs",
"tracing",
]
[[package]]
name = "linkerd-tls"
version = "0.1.0"

View File

@ -78,7 +78,6 @@ members = [
"linkerd/stack",
"linkerd/stack/metrics",
"linkerd/stack/tracing",
"linkerd/system",
"linkerd/tonic-stream",
"linkerd/tonic-watch",
"linkerd/tls",

View File

@ -35,7 +35,7 @@ use tokio::{
sync::mpsc,
time::{self, Duration},
};
use tracing::{debug, info, info_span, Instrument};
use tracing::{debug, error, info, info_span, Instrument};
/// Spawns a sidecar proxy.
///
@ -293,7 +293,10 @@ impl Config {
})
};
metrics::process::register(registry.sub_registry_with_prefix("process"));
if let Err(error) = metrics::process::register(registry.sub_registry_with_prefix("process"))
{
error!(%error, "Failed to register process metrics");
}
registry.register("proxy_build_info", "Proxy build info", BUILD_INFO.metric());
let admin = {

View File

@ -8,7 +8,7 @@ publish = false
[features]
default = []
process = ["dep:linkerd-system"]
process = ["dep:kubert-prometheus-process"]
stack = ["dep:linkerd-stack"]
test_util = []
@ -17,6 +17,7 @@ deflate = { version = "1", features = ["gzip"] }
http = { workspace = true }
http-body = { workspace = true }
hyper = { workspace = true, features = ["deprecated", "http1", "http2"] }
kubert-prometheus-process = { version = "0.2", optional = true }
parking_lot = "0.12"
prometheus-client = "0.23"
tokio = { version = "1", features = ["time"] }
@ -24,7 +25,6 @@ tracing = "0.1"
linkerd-http-box = { path = "../http/box" }
linkerd-stack = { path = "../stack", optional = true }
linkerd-system = { path = "../system", optional = true }
[dev-dependencies]
quickcheck = { version = "1", default-features = false }

View File

@ -10,11 +10,12 @@ mod histogram;
pub mod latency;
#[cfg(feature = "stack")]
mod new_metrics;
#[cfg(feature = "process")]
pub mod process;
mod serve;
mod store;
#[cfg(feature = "process")]
pub use kubert_prometheus_process as process;
#[cfg(feature = "stack")]
pub use self::new_metrics::NewMetrics;
pub use self::{

View File

@ -1,232 +0,0 @@
use crate::prom::{self, encoding::EncodeMetric};
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::time::Instant;
pub fn register(reg: &mut prom::Registry) {
let start_time = Instant::now();
let start_time_from_epoch = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("process start time");
reg.register_with_unit(
"start_time",
"Time that the process started (in seconds since the UNIX epoch)",
prom::Unit::Seconds,
prom::ConstGauge::new(start_time_from_epoch.as_secs_f64()),
);
let clock_time_ts = prom::Gauge::<f64, ClockMetric>::default();
reg.register_with_unit(
"clock_time",
"Current system time for this proxy",
prom::Unit::Seconds,
clock_time_ts,
);
reg.register_collector(Box::new(ProcessCollector {
start_time,
#[cfg(target_os = "linux")]
system: linux::System::new(),
}));
tracing::debug!("Process metrics registered");
#[cfg(not(target_os = "linux"))]
tracing::debug!("System-level process metrics are only supported on Linux");
}
#[derive(Debug)]
struct ProcessCollector {
start_time: Instant,
#[cfg(target_os = "linux")]
system: linux::System,
}
impl prom::collector::Collector for ProcessCollector {
fn encode(&self, mut encoder: prom::encoding::DescriptorEncoder<'_>) -> std::fmt::Result {
let uptime = prom::ConstCounter::new(
Instant::now()
.saturating_duration_since(self.start_time)
.as_secs_f64(),
);
let ue = encoder.encode_descriptor(
"uptime",
"Total time since the process started (in seconds)",
Some(&prom::Unit::Seconds),
prom::metrics::MetricType::Counter,
)?;
uptime.encode(ue)?;
#[cfg(target_os = "linux")]
self.system.encode(encoder)?;
Ok(())
}
}
// Metric that always reports the current system time on a call to [`get`].
#[derive(Copy, Clone, Debug, Default)]
struct ClockMetric;
impl prom::GaugeAtomic<f64> for ClockMetric {
fn inc(&self) -> f64 {
self.get()
}
fn inc_by(&self, _v: f64) -> f64 {
self.get()
}
fn dec(&self) -> f64 {
self.get()
}
fn dec_by(&self, _v: f64) -> f64 {
self.get()
}
fn set(&self, _v: f64) -> f64 {
self.get()
}
fn get(&self) -> f64 {
match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(elapsed) => elapsed.as_secs_f64().floor(),
Err(e) => {
tracing::warn!(
"System time is before the UNIX epoch; reporting negative timestamp"
);
-e.duration().as_secs_f64().floor()
}
}
}
}
#[cfg(target_os = "linux")]
mod linux {
use crate::prom::{self, encoding::EncodeMetric};
use linkerd_system as sys;
use tokio::time::Duration;
#[derive(Clone, Debug)]
pub(super) struct System {
page_size: Option<u64>,
ms_per_tick: Option<u64>,
}
impl System {
pub fn new() -> Self {
let page_size = match sys::page_size() {
Ok(ps) => Some(ps),
Err(err) => {
tracing::debug!("Failed to load page size: {}", err);
None
}
};
let ms_per_tick = match sys::ms_per_tick() {
Ok(mpt) => Some(mpt),
Err(err) => {
tracing::debug!("Failed to load cpu clock speed: {}", err);
None
}
};
Self {
page_size,
ms_per_tick,
}
}
}
impl prom::collector::Collector for System {
fn encode(&self, mut encoder: prom::encoding::DescriptorEncoder<'_>) -> std::fmt::Result {
let stat = match sys::blocking_stat() {
Ok(stat) => stat,
Err(error) => {
tracing::warn!(%error, "Failed to read process stats");
return Ok(());
}
};
if let Some(mpt) = self.ms_per_tick {
let clock_ticks = stat.utime + stat.stime;
let cpu =
prom::ConstCounter::new(Duration::from_millis(clock_ticks * mpt).as_secs_f64());
let cpue = encoder.encode_descriptor(
"cpu",
"Total user and system CPU time spent in seconds",
Some(&prom::Unit::Seconds),
prom::metrics::MetricType::Counter,
)?;
cpu.encode(cpue)?;
} else {
tracing::debug!("Could not determine CPU usage");
}
let vm_bytes = prom::ConstGauge::new(stat.vsize as i64);
let vme = encoder.encode_descriptor(
"virtual_memory",
"Virtual memory size in bytes",
Some(&prom::Unit::Bytes),
prom::metrics::MetricType::Gauge,
)?;
vm_bytes.encode(vme)?;
if let Some(ps) = self.page_size {
let rss_bytes = prom::ConstGauge::new((stat.rss * ps) as i64);
let rsse = encoder.encode_descriptor(
"resident_memory",
"Resident memory size in bytes",
Some(&prom::Unit::Bytes),
prom::metrics::MetricType::Gauge,
)?;
rss_bytes.encode(rsse)?;
} else {
tracing::debug!("Could not determine RSS");
}
match sys::open_fds(stat.pid) {
Ok(open_fds) => {
let fds = prom::ConstGauge::new(open_fds as i64);
let fdse = encoder.encode_descriptor(
"open_fds",
"Number of open file descriptors",
None,
prom::metrics::MetricType::Gauge,
)?;
fds.encode(fdse)?;
}
Err(error) => {
tracing::warn!(%error, "Could not determine open fds");
}
}
match sys::max_fds() {
Ok(max_fds) => {
let fds = prom::ConstGauge::new(max_fds as i64);
let fdse = encoder.encode_descriptor(
"max_fds",
"Maximum number of open file descriptors",
None,
prom::metrics::MetricType::Gauge,
)?;
fds.encode(fdse)?;
}
Err(error) => {
tracing::warn!(%error, "Could not determine max fds");
}
}
let threads = prom::ConstGauge::new(stat.num_threads);
let te = encoder.encode_descriptor(
"threads",
"Number of OS threads in the process.",
None,
prom::metrics::MetricType::Gauge,
)?;
threads.encode(te)?;
Ok(())
}
}
}

View File

@ -1,17 +0,0 @@
[package]
name = "linkerd-system"
version = "0.1.0"
authors = ["Linkerd Developers <cncf-linkerd-dev@lists.cncf.io>"]
license = "Apache-2.0"
edition = "2021"
publish = false
description = """
Unsafe code for accessing system-level counters for memory & CPU usage.
"""
[dependencies]
tracing = "0.1"
[target.'cfg(target_os = "linux")'.dependencies]
libc = "0.2"
procfs = { version = "0.17.0", default-features = false }

View File

@ -1,12 +0,0 @@
//! Unsafe code for accessing system-level counters for memory & CPU usage.
#![deny(rust_2018_idioms, clippy::disallowed_methods, unsafe_code)]
#[cfg(target_os = "linux")]
mod linux;
#[cfg(target_os = "linux")]
pub use self::linux::{blocking_stat, max_fds, ms_per_tick, open_fds, page_size, Stat};
#[cfg(not(target_os = "linux"))]
compile_error!("The system crate requires Linux");

View File

@ -1,65 +0,0 @@
use libc::pid_t;
use procfs::{
process::{self, LimitValue, Process},
ProcResult,
};
use std::{fs, io};
use tracing::{error, warn};
pub use process::Stat;
pub fn page_size() -> io::Result<u64> {
sysconf(libc::_SC_PAGESIZE, "page size")
}
pub fn ms_per_tick() -> io::Result<u64> {
// On Linux, CLK_TCK is ~always `100`, so pure integer division
// works. This is probably not suitable if we encounter other
// values.
let clock_ticks_per_sec = sysconf(libc::_SC_CLK_TCK, "clock ticks per second")?;
let ms_per_tick = 1_000 / clock_ticks_per_sec;
if clock_ticks_per_sec != 100 {
warn!(
clock_ticks_per_sec,
ms_per_tick, "Unexpected value; process_cpu_seconds_total may be inaccurate."
);
}
Ok(ms_per_tick)
}
pub fn blocking_stat() -> ProcResult<Stat> {
Process::myself()?.stat()
}
pub fn open_fds(pid: pid_t) -> io::Result<u64> {
let mut open = 0;
for f in fs::read_dir(format!("/proc/{}/fd", pid))? {
if !f?.file_type()?.is_dir() {
open += 1;
}
}
Ok(open)
}
pub fn max_fds() -> ProcResult<u64> {
let limits = Process::myself()?.limits()?.max_open_files;
match limits.soft_limit {
LimitValue::Unlimited => match limits.hard_limit {
LimitValue::Unlimited => Ok(0),
LimitValue::Value(hard) => Ok(hard),
},
LimitValue::Value(soft) => Ok(soft),
}
}
#[allow(unsafe_code)]
fn sysconf(num: libc::c_int, name: &'static str) -> Result<u64, io::Error> {
match unsafe { libc::sysconf(num) } {
e if e <= 0 => {
let error = io::Error::last_os_error();
error!("error getting {}: {:?}", name, error);
Err(error)
}
val => Ok(val as u64),
}
}