Move metrics eviction into telemetry::http (#81)
The metrics server is responsible for evicting unused metrics, which seems like an unnecessary coupling with the storage implementation. This change moves this logic into `telemetry::http` so that the eviction strategy is specific to the implementation. Now that the metrics structure is shared internally to `http`, `Report`'s implementation of `FmtMetrics` can evict expired metrics. There are no functional changes.
This commit is contained in:
parent
dfcc0086b8
commit
d6d05905f1
|
@ -30,8 +30,12 @@ metrics! {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new(taps: &Arc<Mutex<Taps>>) -> (Sensors, Report) {
|
pub fn new(metrics_retain_idle: Duration, taps: &Arc<Mutex<Taps>>) -> (Sensors, Report) {
|
||||||
let inner = Arc::new(Mutex::new(Inner::default()));
|
let inner = Arc::new(Mutex::new(Inner {
|
||||||
|
retain_idle: metrics_retain_idle,
|
||||||
|
.. Inner::default()
|
||||||
|
}));
|
||||||
|
|
||||||
let sensors = Sensors::new(Record::new(Registry(inner.clone())), taps);
|
let sensors = Sensors::new(Record::new(Registry(inner.clone())), taps);
|
||||||
(sensors, Report(inner))
|
(sensors, Report(inner))
|
||||||
}
|
}
|
||||||
|
@ -44,14 +48,12 @@ pub fn new(taps: &Arc<Mutex<Taps>>) -> (Sensors, Report) {
|
||||||
struct Registry(Arc<Mutex<Inner>>);
|
struct Registry(Arc<Mutex<Inner>>);
|
||||||
|
|
||||||
/// Reports HTTP metrics for prometheus.
|
/// Reports HTTP metrics for prometheus.
|
||||||
///
|
|
||||||
/// TODO retain_since should be done implicitly and should not be part of the
|
|
||||||
/// public interface.
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct Report(Arc<Mutex<Inner>>);
|
pub struct Report(Arc<Mutex<Inner>>);
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
struct Inner {
|
struct Inner {
|
||||||
|
retain_idle: Duration,
|
||||||
requests: RequestScopes,
|
requests: RequestScopes,
|
||||||
responses: ResponseScopes,
|
responses: ResponseScopes,
|
||||||
}
|
}
|
||||||
|
@ -105,22 +107,27 @@ impl Registry {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===== impl Report =====
|
// ===== impl Inner =====
|
||||||
|
|
||||||
impl Report {
|
impl Inner {
|
||||||
pub(super) fn retain_since(&mut self, epoch: Instant) {
|
fn retain_since(&mut self, epoch: Instant) {
|
||||||
if let Ok(mut inner) = self.0.lock() {
|
self.requests.retain(|_, v| v.stamp >= epoch);
|
||||||
inner.requests.retain(|_, v| v.stamp >= epoch);
|
self.responses.retain(|_, v| v.stamp >= epoch);
|
||||||
inner.responses.retain(|_, v| v.stamp >= epoch);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ===== impl Report =====
|
||||||
|
|
||||||
impl FmtMetrics for Report {
|
impl FmtMetrics for Report {
|
||||||
fn fmt_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
fn fmt_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
let now = Instant::now();
|
||||||
let inner = match self.0.lock() {
|
let inner = match self.0.lock() {
|
||||||
Err(_) => return Ok(()),
|
Err(_) => return Ok(()),
|
||||||
Ok(inner) => inner,
|
Ok(mut inner) => {
|
||||||
|
let epoch = now - inner.retain_idle;
|
||||||
|
inner.retain_since(epoch);
|
||||||
|
inner
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if !inner.requests.is_empty() {
|
if !inner.requests.is_empty() {
|
||||||
|
@ -236,7 +243,6 @@ mod tests {
|
||||||
|
|
||||||
let inner = Arc::new(Mutex::new(Inner::default()));
|
let inner = Arc::new(Mutex::new(Inner::default()));
|
||||||
let mut registry = Registry(inner.clone());
|
let mut registry = Registry(inner.clone());
|
||||||
let mut report = Report(inner.clone());
|
|
||||||
|
|
||||||
let t0 = Instant::now();
|
let t0 = Instant::now();
|
||||||
|
|
||||||
|
@ -246,31 +252,20 @@ mod tests {
|
||||||
mock_route(&mut registry, proxy, &server, "sixers");
|
mock_route(&mut registry, proxy, &server, "sixers");
|
||||||
let t2 = Instant::now();
|
let t2 = Instant::now();
|
||||||
|
|
||||||
{
|
let mut inner = inner.lock().unwrap();
|
||||||
let inner = inner.lock().unwrap();
|
assert_eq!(inner.requests.len(), 2);
|
||||||
assert_eq!(inner.requests.len(), 2);
|
assert_eq!(inner.responses.len(), 2);
|
||||||
assert_eq!(inner.responses.len(), 2);
|
|
||||||
}
|
|
||||||
|
|
||||||
report.retain_since(t0);
|
inner.retain_since(t0);
|
||||||
{
|
assert_eq!(inner.requests.len(), 2);
|
||||||
let inner = inner.lock().unwrap();
|
assert_eq!(inner.responses.len(), 2);
|
||||||
assert_eq!(inner.requests.len(), 2);
|
|
||||||
assert_eq!(inner.responses.len(), 2);
|
|
||||||
}
|
|
||||||
|
|
||||||
report.retain_since(t1);
|
inner.retain_since(t1);
|
||||||
{
|
assert_eq!(inner.requests.len(), 1);
|
||||||
let inner = inner.lock().unwrap();
|
assert_eq!(inner.responses.len(), 1);
|
||||||
assert_eq!(inner.requests.len(), 1);
|
|
||||||
assert_eq!(inner.responses.len(), 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
report.retain_since(t2);
|
inner.retain_since(t2);
|
||||||
{
|
assert_eq!(inner.requests.len(), 0);
|
||||||
let inner = inner.lock().unwrap();
|
assert_eq!(inner.responses.len(), 0);
|
||||||
assert_eq!(inner.requests.len(), 0);
|
|
||||||
assert_eq!(inner.responses.len(), 0);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,7 +27,6 @@
|
||||||
//! to worry about missing commas, double commas, or trailing commas at the
|
//! to worry about missing commas, double commas, or trailing commas at the
|
||||||
//! end of the label set (all of which will make Prometheus angry).
|
//! end of the label set (all of which will make Prometheus angry).
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::time::Instant;
|
|
||||||
|
|
||||||
mod counter;
|
mod counter;
|
||||||
mod gauge;
|
mod gauge;
|
||||||
|
@ -70,11 +69,6 @@ impl Root {
|
||||||
process,
|
process,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO this should be moved into `http`
|
|
||||||
fn retain_since(&mut self, epoch: Instant) {
|
|
||||||
self.http.retain_since(epoch);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===== impl Root =====
|
// ===== impl Root =====
|
||||||
|
|
|
@ -12,7 +12,6 @@ use std::error::Error;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::io::{self, Write};
|
use std::io::{self, Write};
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::time::{Duration, Instant};
|
|
||||||
use tokio::executor::current_thread::TaskExecutor;
|
use tokio::executor::current_thread::TaskExecutor;
|
||||||
|
|
||||||
use super::{prom::FmtMetrics, Root};
|
use super::{prom::FmtMetrics, Root};
|
||||||
|
@ -23,7 +22,6 @@ use transport::BoundPort;
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct Serve {
|
pub struct Serve {
|
||||||
metrics: Arc<Mutex<Root>>,
|
metrics: Arc<Mutex<Root>>,
|
||||||
idle_retain: Duration,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
@ -35,11 +33,8 @@ enum ServeError {
|
||||||
// ===== impl Serve =====
|
// ===== impl Serve =====
|
||||||
|
|
||||||
impl Serve {
|
impl Serve {
|
||||||
pub fn new(metrics: &Arc<Mutex<Root>>, idle_retain: Duration) -> Self {
|
pub fn new(metrics: &Arc<Mutex<Root>>) -> Self {
|
||||||
Serve {
|
Self { metrics: metrics.clone() }
|
||||||
metrics: metrics.clone(),
|
|
||||||
idle_retain,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn is_gzip<B>(req: &Request<B>) -> bool {
|
fn is_gzip<B>(req: &Request<B>) -> bool {
|
||||||
|
@ -100,12 +95,9 @@ impl Service for Serve {
|
||||||
return future::ok(rsp);
|
return future::ok(rsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut metrics = self.metrics.lock()
|
let metrics = self.metrics.lock()
|
||||||
.expect("metrics lock poisoned");
|
.expect("metrics lock poisoned");
|
||||||
|
|
||||||
metrics.retain_since(Instant::now() - self.idle_retain);
|
|
||||||
let metrics = metrics;
|
|
||||||
|
|
||||||
let resp = if Self::is_gzip(&req) {
|
let resp = if Self::is_gzip(&req) {
|
||||||
trace!("gzipping metrics");
|
trace!("gzipping metrics");
|
||||||
let mut writer = GzEncoder::new(Vec::<u8>::new(), CompressionOptions::fast());
|
let mut writer = GzEncoder::new(Vec::<u8>::new(), CompressionOptions::fast());
|
||||||
|
|
|
@ -34,7 +34,7 @@ pub fn new(
|
||||||
taps: &Arc<Mutex<tap::Taps>>,
|
taps: &Arc<Mutex<tap::Taps>>,
|
||||||
) -> (Sensors, transport::Registry, tls_config_reload::Sensor, ServeMetrics) {
|
) -> (Sensors, transport::Registry, tls_config_reload::Sensor, ServeMetrics) {
|
||||||
let process = process::Report::new(start_time);
|
let process = process::Report::new(start_time);
|
||||||
let (http_sensors, http_report) = http::new(taps);
|
let (http_sensors, http_report) = http::new(metrics_retain_idle, taps);
|
||||||
let (transport_registry, transport_report) = transport::new();
|
let (transport_registry, transport_report) = transport::new();
|
||||||
let (tls_config_sensor, tls_config_fmt) = tls_config_reload::new();
|
let (tls_config_sensor, tls_config_fmt) = tls_config_reload::new();
|
||||||
|
|
||||||
|
@ -44,6 +44,6 @@ pub fn new(
|
||||||
tls_config_fmt,
|
tls_config_fmt,
|
||||||
process,
|
process,
|
||||||
)));
|
)));
|
||||||
let serve = ServeMetrics::new(&report, metrics_retain_idle);
|
let serve = ServeMetrics::new(&report);
|
||||||
(http_sensors, transport_registry, tls_config_sensor, serve)
|
(http_sensors, transport_registry, tls_config_sensor, serve)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue