From d6d05905f1511ebb4d6225a4223cc86a6a52d87f Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Wed, 22 Aug 2018 16:20:51 -0700 Subject: [PATCH] Move metrics eviction into telemetry::http (#81) The metrics server is responsible for evicting unused metrics, which seems like an unnecessary coupling with the storage implementation. This change moves this logic into `telemetry::http` so that the eviction strategy is specific to the implementation. Now that the metrics structure is shared internally to `http`, `Report`'s implementation of `FmtMetrics` can evict expired metrics. There are no functional changes. --- src/telemetry/http/mod.rs | 69 ++++++++++++++++------------------ src/telemetry/metrics/mod.rs | 6 --- src/telemetry/metrics/serve.rs | 14 ++----- src/telemetry/mod.rs | 4 +- 4 files changed, 37 insertions(+), 56 deletions(-) diff --git a/src/telemetry/http/mod.rs b/src/telemetry/http/mod.rs index 333e6ce71..ea1f393c8 100644 --- a/src/telemetry/http/mod.rs +++ b/src/telemetry/http/mod.rs @@ -30,8 +30,12 @@ metrics! { } } -pub fn new(taps: &Arc>) -> (Sensors, Report) { - let inner = Arc::new(Mutex::new(Inner::default())); +pub fn new(metrics_retain_idle: Duration, taps: &Arc>) -> (Sensors, Report) { + let inner = Arc::new(Mutex::new(Inner { + retain_idle: metrics_retain_idle, + .. Inner::default() + })); + let sensors = Sensors::new(Record::new(Registry(inner.clone())), taps); (sensors, Report(inner)) } @@ -44,14 +48,12 @@ pub fn new(taps: &Arc>) -> (Sensors, Report) { struct Registry(Arc>); /// Reports HTTP metrics for prometheus. -/// -/// TODO retain_since should be done implicitly and should not be part of the -/// public interface. #[derive(Clone, Debug)] pub struct Report(Arc>); #[derive(Debug, Default)] struct Inner { + retain_idle: Duration, requests: RequestScopes, responses: ResponseScopes, } @@ -105,22 +107,27 @@ impl Registry { } } -// ===== impl Report ===== +// ===== impl Inner ===== -impl Report { - pub(super) fn retain_since(&mut self, epoch: Instant) { - if let Ok(mut inner) = self.0.lock() { - inner.requests.retain(|_, v| v.stamp >= epoch); - inner.responses.retain(|_, v| v.stamp >= epoch); - } +impl Inner { + fn retain_since(&mut self, epoch: Instant) { + self.requests.retain(|_, v| v.stamp >= epoch); + self.responses.retain(|_, v| v.stamp >= epoch); } } +// ===== impl Report ===== + impl FmtMetrics for Report { fn fmt_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result { + let now = Instant::now(); let inner = match self.0.lock() { Err(_) => return Ok(()), - Ok(inner) => inner, + Ok(mut inner) => { + let epoch = now - inner.retain_idle; + inner.retain_since(epoch); + inner + } }; if !inner.requests.is_empty() { @@ -236,7 +243,6 @@ mod tests { let inner = Arc::new(Mutex::new(Inner::default())); let mut registry = Registry(inner.clone()); - let mut report = Report(inner.clone()); let t0 = Instant::now(); @@ -246,31 +252,20 @@ mod tests { mock_route(&mut registry, proxy, &server, "sixers"); let t2 = Instant::now(); - { - let inner = inner.lock().unwrap(); - assert_eq!(inner.requests.len(), 2); - assert_eq!(inner.responses.len(), 2); - } + let mut inner = inner.lock().unwrap(); + assert_eq!(inner.requests.len(), 2); + assert_eq!(inner.responses.len(), 2); - report.retain_since(t0); - { - let inner = inner.lock().unwrap(); - assert_eq!(inner.requests.len(), 2); - assert_eq!(inner.responses.len(), 2); - } + inner.retain_since(t0); + assert_eq!(inner.requests.len(), 2); + assert_eq!(inner.responses.len(), 2); - report.retain_since(t1); - { - let inner = inner.lock().unwrap(); - assert_eq!(inner.requests.len(), 1); - assert_eq!(inner.responses.len(), 1); - } + inner.retain_since(t1); + assert_eq!(inner.requests.len(), 1); + assert_eq!(inner.responses.len(), 1); - report.retain_since(t2); - { - let inner = inner.lock().unwrap(); - assert_eq!(inner.requests.len(), 0); - assert_eq!(inner.responses.len(), 0); - } + inner.retain_since(t2); + assert_eq!(inner.requests.len(), 0); + assert_eq!(inner.responses.len(), 0); } } diff --git a/src/telemetry/metrics/mod.rs b/src/telemetry/metrics/mod.rs index b5ab89dd6..73a3fbe05 100644 --- a/src/telemetry/metrics/mod.rs +++ b/src/telemetry/metrics/mod.rs @@ -27,7 +27,6 @@ //! to worry about missing commas, double commas, or trailing commas at the //! end of the label set (all of which will make Prometheus angry). use std::fmt; -use std::time::Instant; mod counter; mod gauge; @@ -70,11 +69,6 @@ impl Root { process, } } - - // TODO this should be moved into `http` - fn retain_since(&mut self, epoch: Instant) { - self.http.retain_since(epoch); - } } // ===== impl Root ===== diff --git a/src/telemetry/metrics/serve.rs b/src/telemetry/metrics/serve.rs index 51b7d583f..14ace7576 100644 --- a/src/telemetry/metrics/serve.rs +++ b/src/telemetry/metrics/serve.rs @@ -12,7 +12,6 @@ use std::error::Error; use std::fmt; use std::io::{self, Write}; use std::sync::{Arc, Mutex}; -use std::time::{Duration, Instant}; use tokio::executor::current_thread::TaskExecutor; use super::{prom::FmtMetrics, Root}; @@ -23,7 +22,6 @@ use transport::BoundPort; #[derive(Debug, Clone)] pub struct Serve { metrics: Arc>, - idle_retain: Duration, } #[derive(Debug)] @@ -35,11 +33,8 @@ enum ServeError { // ===== impl Serve ===== impl Serve { - pub fn new(metrics: &Arc>, idle_retain: Duration) -> Self { - Serve { - metrics: metrics.clone(), - idle_retain, - } + pub fn new(metrics: &Arc>) -> Self { + Self { metrics: metrics.clone() } } fn is_gzip(req: &Request) -> bool { @@ -100,12 +95,9 @@ impl Service for Serve { return future::ok(rsp); } - let mut metrics = self.metrics.lock() + let metrics = self.metrics.lock() .expect("metrics lock poisoned"); - metrics.retain_since(Instant::now() - self.idle_retain); - let metrics = metrics; - let resp = if Self::is_gzip(&req) { trace!("gzipping metrics"); let mut writer = GzEncoder::new(Vec::::new(), CompressionOptions::fast()); diff --git a/src/telemetry/mod.rs b/src/telemetry/mod.rs index ee0ca5fb1..78fa02df2 100644 --- a/src/telemetry/mod.rs +++ b/src/telemetry/mod.rs @@ -34,7 +34,7 @@ pub fn new( taps: &Arc>, ) -> (Sensors, transport::Registry, tls_config_reload::Sensor, ServeMetrics) { let process = process::Report::new(start_time); - let (http_sensors, http_report) = http::new(taps); + let (http_sensors, http_report) = http::new(metrics_retain_idle, taps); let (transport_registry, transport_report) = transport::new(); let (tls_config_sensor, tls_config_fmt) = tls_config_reload::new(); @@ -44,6 +44,6 @@ pub fn new( tls_config_fmt, process, ))); - let serve = ServeMetrics::new(&report, metrics_retain_idle); + let serve = ServeMetrics::new(&report); (http_sensors, transport_registry, tls_config_sensor, serve) }