From 912b9d3b7c2fd274a657feeb9a45be0c6e63c3d5 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Mon, 20 Aug 2018 15:02:50 -0700 Subject: [PATCH] Split transport metrics into read and write halves (#74) In anticipation of removing Transport-related Event types, we want to separate the concerns of recording transport metrics updates from reporting them to the metrics endpoint. The transport module has been split into `Registry` and `Report` types: `Registry` is responsible for recording updates, and `Report` is responsible for rendering metrics. --- src/telemetry/metrics/mod.rs | 42 +++---- src/telemetry/metrics/record.rs | 31 +++-- src/telemetry/mod.rs | 9 +- .../transport.rs => transport/mod.rs} | 111 ++++++++++++------ 4 files changed, 113 insertions(+), 80 deletions(-) rename src/telemetry/{metrics/transport.rs => transport/mod.rs} (70%) diff --git a/src/telemetry/metrics/mod.rs b/src/telemetry/metrics/mod.rs index 1984c454d..c2e16771a 100644 --- a/src/telemetry/metrics/mod.rs +++ b/src/telemetry/metrics/mod.rs @@ -29,7 +29,7 @@ use std::default::Default; use std::fmt; use std::sync::{Arc, Mutex}; -use std::time::{Duration, Instant, SystemTime}; +use std::time::{Duration, Instant}; mod counter; mod gauge; @@ -41,7 +41,6 @@ pub mod prom; mod record; mod scopes; mod serve; -mod transport; pub use self::counter::Counter; pub use self::gauge::Gauge; @@ -54,7 +53,7 @@ pub use self::prom::{FmtMetrics, FmtLabels, FmtMetric}; pub use self::record::Record; pub use self::scopes::Scopes; pub use self::serve::Serve; -pub use self::transport::Transports; +use super::transport; use super::process; use super::tls_config_reload; @@ -63,7 +62,7 @@ use super::tls_config_reload; struct Root { requests: http::RequestScopes, responses: http::ResponseScopes, - transports: Transports, + transports: transport::Report, tls_config_reload: tls_config_reload::Report, process: process::Report, } @@ -80,19 +79,27 @@ struct Stamped { /// is a Hyper service which can be used to create the server for the /// scrape endpoint, while the `Record` side can receive updates to the /// metrics by calling `record_event`. -pub fn new(start_time: SystemTime, idle_retain: Duration, tls: tls_config_reload::Report) - -> (Record, Serve) -{ - let metrics = Arc::new(Mutex::new(Root::new(start_time, tls))); - (Record::new(&metrics), Serve::new(&metrics, idle_retain)) +pub fn new( + idle_retain: Duration, + process: process::Report, + tls: tls_config_reload::Report +) -> (Record, Serve) { + let (transports, transports_report) = transport::new(); + let metrics = Arc::new(Mutex::new(Root::new(process, transports_report, tls))); + (Record::new(&metrics, transports), Serve::new(&metrics, idle_retain)) } // ===== impl Root ===== impl Root { - pub fn new(start_time: SystemTime, tls_config_reload: tls_config_reload::Report) -> Self { + fn new( + process: process::Report, + transports: transport::Report, + tls_config_reload: tls_config_reload::Report + ) -> Self { Self { - process: process::Report::new(start_time), + process, + transports, tls_config_reload, .. Root::default() } @@ -106,10 +113,6 @@ impl Root { self.responses.get_or_default(labels).stamped() } - fn transports(&mut self) -> &mut Transports { - &mut self.transports - } - fn retain_since(&mut self, epoch: Instant) { self.requests.retain(|_, v| v.stamp >= epoch); self.responses.retain(|_, v| v.stamp >= epoch); @@ -178,15 +181,8 @@ mod tests { ) { let client = client(proxy, indexmap!["team".into() => team.into(),], TLS_DISABLED); let (req, rsp) = request("http://nba.com", &server, &client); - - let client_transport = Arc::new(ctx::transport::Ctx::Client(client)); - root.transports.open(&client_transport); - root.request(RequestLabels::new(&req)).end(); root.response(ResponseLabels::new(&rsp, None)).end(Duration::from_millis(10)); - - let duration = Duration::from_millis(15); - root.transports().close(&client_transport, transport::Eos::Clean, duration, 100, 200); } #[test] @@ -194,12 +190,10 @@ mod tests { let proxy = ctx::Proxy::Outbound; let server = server(proxy, TLS_DISABLED); - let server_transport = Arc::new(ctx::transport::Ctx::Server(server.clone())); let mut root = Root::default(); let t0 = Instant::now(); - root.transports().open(&server_transport); mock_route(&mut root, proxy, &server, "warriors"); let t1 = Instant::now(); diff --git a/src/telemetry/metrics/record.rs b/src/telemetry/metrics/record.rs index 537e26852..dccefc5ff 100644 --- a/src/telemetry/metrics/record.rs +++ b/src/telemetry/metrics/record.rs @@ -12,13 +12,14 @@ use super::transport; #[derive(Clone, Debug)] pub struct Record { metrics: Arc>, + transports: transport::Registry, } // ===== impl Record ===== impl Record { - pub(super) fn new(metrics: &Arc>) -> Self { - Self { metrics: metrics.clone() } + pub(super) fn new(metrics: &Arc>, transports: transport::Registry) -> Self { + Self { metrics: metrics.clone(), transports } } #[inline] @@ -67,9 +68,7 @@ impl Record { }, Event::TransportOpen(ref ctx) => { - self.update(|metrics| { - metrics.transports().open(ctx); - }) + self.transports.open(ctx); }, Event::TransportClose(ref ctx, ref close) => { @@ -80,10 +79,8 @@ impl Record { errno: close.errno.map(|e| e.into()) } }; - self.update(|metrics| { - metrics.transports() - .close(ctx, eos, close.duration, close.rx_bytes, close.tx_bytes); - }) + self.transports + .close(ctx, eos, close.duration, close.rx_bytes, close.tx_bytes); }, }; } @@ -97,7 +94,7 @@ mod test { Event, }; use ctx::{self, test_util::*, transport::TlsStatus}; - use std::time::{Duration, Instant, SystemTime}; + use std::time::{Duration, Instant}; use conditional::Conditional; use tls; @@ -131,7 +128,7 @@ mod test { frames_sent: 0, }; - let (mut r, _) = metrics::new(SystemTime::now(), Duration::from_secs(100), Default::default()); + let (mut r, _) = metrics::new(Duration::from_secs(100), Default::default(), Default::default()); let ev = Event::StreamResponseEnd(rsp.clone(), end.clone()); let labels = labels::ResponseLabels::new(&rsp, None); @@ -226,7 +223,7 @@ mod test { ), ]; - let (mut r, _) = metrics::new(SystemTime::now(), Duration::from_secs(1000), Default::default()); + let (mut r, _) = metrics::new(Duration::from_secs(1000), Default::default(), Default::default()); let req_labels = RequestLabels::new(&req); let rsp_labels = ResponseLabels::new(&rsp, None); @@ -235,11 +232,11 @@ mod test { assert_eq!(client_tls, rsp_labels.tls_status()); { - let mut lock = r.metrics.lock().expect("lock"); + let lock = r.metrics.lock().expect("lock"); assert!(lock.requests.get(&req_labels).is_none()); assert!(lock.responses.get(&rsp_labels).is_none()); - assert_eq!(lock.transports().open_total(&server_transport), 0); - assert_eq!(lock.transports().open_total(&client_transport), 0); + assert_eq!(r.transports.open_total(&server_transport), 0); + assert_eq!(r.transports.open_total(&client_transport), 0); } for e in &events { @@ -247,7 +244,7 @@ mod test { } { - let mut lock = r.metrics.lock().expect("lock"); + let lock = r.metrics.lock().expect("lock"); // === request scope ==================================== assert_eq!( @@ -273,7 +270,7 @@ mod test { use super::transport::Eos; let transport_duration: u64 = 30_000 * 1_000; - let t = lock.transports(); + let t = r.transports; assert_eq!(t.open_total(&server_transport), 1); assert_eq!(t.rx_tx_bytes_total(&server_transport), (4321, 4321)); diff --git a/src/telemetry/mod.rs b/src/telemetry/mod.rs index b09537c09..23c356aee 100644 --- a/src/telemetry/mod.rs +++ b/src/telemetry/mod.rs @@ -22,6 +22,7 @@ mod process; pub mod sensor; pub mod tap; pub mod tls_config_reload; +mod transport; use self::errno::Errno; pub use self::event::Event; @@ -33,8 +34,10 @@ pub fn new( metrics_retain_idle: Duration, taps: &Arc>, ) -> (Sensors, tls_config_reload::Sensor, ServeMetrics) { + let process = process::Report::new(start_time); let (tls_config_sensor, tls_config_fmt) = tls_config_reload::new(); - let (metrics_record, metrics_serve) = metrics::new(start_time, metrics_retain_idle, tls_config_fmt); - let s = Sensors::new(metrics_record, taps); - (s, tls_config_sensor, metrics_serve) + + let (record, serve) = metrics::new(metrics_retain_idle, process, tls_config_fmt); + let s = Sensors::new(record, taps); + (s, tls_config_sensor, serve) } diff --git a/src/telemetry/metrics/transport.rs b/src/telemetry/transport/mod.rs similarity index 70% rename from src/telemetry/metrics/transport.rs rename to src/telemetry/transport/mod.rs index f5a946cdd..46a0131af 100644 --- a/src/telemetry/metrics/transport.rs +++ b/src/telemetry/transport/mod.rs @@ -1,16 +1,17 @@ use indexmap::IndexMap; use std::fmt; +use std::sync::{Arc, Mutex}; use std::time::Duration; use ctx; -use super::{ +use telemetry::Errno; +use telemetry::metrics::{ latency, prom::{FmtLabels, FmtMetrics}, Counter, Gauge, Histogram, }; -use telemetry::Errno; metrics! { tcp_open_total: Counter { "Total count of opened connections" }, @@ -22,14 +23,22 @@ metrics! { tcp_connection_duration_ms: Histogram { "Connection lifetimes" } } -/// Holds all transport stats. -/// -/// Implements `fmt::Display` to render prometheus-formatted metrics for all transports. -#[derive(Debug, Default)] -pub struct Transports { - metrics: IndexMap, +pub fn new() -> (Registry, Report) { + let inner = Arc::new(Mutex::new(Inner::default())); + (Registry(inner.clone()), Report(inner)) } +/// Implements `FmtMetrics` to render prometheus-formatted metrics for all transports. +#[derive(Debug, Default)] +pub struct Report(Arc>); + +/// Supports recording telemetry metrics. +#[derive(Clone, Debug)] +pub struct Registry(Arc>); + +#[derive(Debug, Default)] +struct Inner(IndexMap); + /// Describes the dimensions across which transport metrics are aggregated. /// /// Implements `fmt::Display` to render a comma-separated list of key-value pairs. @@ -69,11 +78,39 @@ struct EosMetrics { connection_duration: Histogram, } -// ===== impl Transports ===== +impl Inner { + fn is_empty(&self) -> bool { + self.0.is_empty() + } -impl Transports { + /// Iterates over all metrics. + fn iter(&self) -> impl Iterator { + self.0.iter() + } + + /// Iterates over all end-of-stream metrics. + fn iter_eos(&self) -> impl Iterator { + self.iter() + .flat_map(|(k, t)| { + t.by_eos.iter().map(move |(e, m)| ((k ,e), m)) + }) + } + + fn get_or_default(&mut self, k: Key) -> &mut Metrics { + self.0.entry(k).or_insert_with(|| Metrics::default()) + } +} + +// ===== impl Registry ===== + +impl Registry { pub fn open(&mut self, ctx: &ctx::transport::Ctx) { - let metrics = self.metrics.entry(Key::new(ctx)).or_insert_with(|| Default::default()); + let mut inner = match self.0.lock() { + Err(_) => return, + Ok(lock) => lock, + }; + + let metrics = inner.get_or_default(Key::new(ctx)); metrics.open_total.incr(); metrics.open_connections.incr(); } @@ -86,30 +123,27 @@ impl Transports { rx: u64, tx: u64, ) { - let key = Key::new(ctx); + let mut inner = match self.0.lock() { + Err(_) => return, + Ok(lock) => lock, + }; - let metrics = self.metrics.entry(key).or_insert_with(|| Default::default()); + let key = Key::new(ctx); + let metrics = inner.get_or_default(key); metrics.open_connections.decr(); metrics.read_bytes_total += rx; metrics.write_bytes_total += tx; - let class = metrics.by_eos.entry(eos).or_insert_with(|| Default::default()); + let class = metrics.by_eos + .entry(eos) + .or_insert_with(|| EosMetrics::default()); class.close_total.incr(); class.connection_duration.add(duration); } - /// Iterates over all end-of-stream metrics. - fn iter_eos(&self) -> impl Iterator { - self.metrics - .iter() - .flat_map(|(k, t)| { - t.by_eos.iter().map(move |(e, m)| ((k ,e), m)) - }) - } - #[cfg(test)] pub fn open_total(&self, ctx: &ctx::transport::Ctx) -> u64 { - self.metrics + self.0.lock().unwrap().0 .get(&Key::new(ctx)) .map(|m| m.open_total.into()) .unwrap_or(0) @@ -117,7 +151,7 @@ impl Transports { // #[cfg(test)] // pub fn open_connections(&self, ctx: &ctx::transport::Ctx) -> u64 { - // self.metrics + // self.0.lock().unwrap().0 // .get(&Key::new(ctx)) // .map(|m| m.open_connections.into()) // .unwrap_or(0) @@ -125,7 +159,7 @@ impl Transports { #[cfg(test)] pub fn rx_tx_bytes_total(&self, ctx: &ctx::transport::Ctx) -> (u64, u64) { - self.metrics + self.0.lock().unwrap().0 .get(&Key::new(ctx)) .map(|m| (m.read_bytes_total.into(), m.write_bytes_total.into())) .unwrap_or((0, 0)) @@ -133,7 +167,7 @@ impl Transports { #[cfg(test)] pub fn close_total(&self, ctx: &ctx::transport::Ctx, eos: Eos) -> u64 { - self.metrics + self.0.lock().unwrap().0 .get(&Key::new(ctx)) .and_then(move |m| m.by_eos.get(&eos).map(|m| m.close_total.into())) .unwrap_or(0) @@ -141,36 +175,41 @@ impl Transports { #[cfg(test)] pub fn connection_durations(&self, ctx: &ctx::transport::Ctx, eos: Eos) -> Histogram { - self.metrics + self.0.lock().unwrap().0 .get(&Key::new(ctx)) .and_then(move |m| m.by_eos.get(&eos).map(|m| m.connection_duration.clone())) .unwrap_or_default() } } -impl FmtMetrics for Transports { +impl FmtMetrics for Report { fn fmt_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result { - if self.metrics.is_empty() { + let metrics = match self.0.lock() { + Err(_) => return Ok(()), + Ok(lock) => lock, + }; + + if metrics.is_empty() { return Ok(()); } tcp_open_total.fmt_help(f)?; - tcp_open_total.fmt_scopes(f, &self.metrics, |m| &m.open_total)?; + tcp_open_total.fmt_scopes(f, metrics.iter(), |m| &m.open_total)?; tcp_open_connections.fmt_help(f)?; - tcp_open_connections.fmt_scopes(f, &self.metrics, |m| &m.open_connections)?; + tcp_open_connections.fmt_scopes(f, metrics.iter(), |m| &m.open_connections)?; tcp_read_bytes_total.fmt_help(f)?; - tcp_read_bytes_total.fmt_scopes(f, &self.metrics, |m| &m.read_bytes_total)?; + tcp_read_bytes_total.fmt_scopes(f, metrics.iter(), |m| &m.read_bytes_total)?; tcp_write_bytes_total.fmt_help(f)?; - tcp_write_bytes_total.fmt_scopes(f, &self.metrics, |m| &m.write_bytes_total)?; + tcp_write_bytes_total.fmt_scopes(f, metrics.iter(), |m| &m.write_bytes_total)?; tcp_close_total.fmt_help(f)?; - tcp_close_total.fmt_scopes(f, self.iter_eos(), |e| &e.close_total)?; + tcp_close_total.fmt_scopes(f, metrics.iter_eos(), |e| &e.close_total)?; tcp_connection_duration_ms.fmt_help(f)?; - tcp_connection_duration_ms.fmt_scopes(f, self.iter_eos(), |e| &e.connection_duration)?; + tcp_connection_duration_ms.fmt_scopes(f, metrics.iter_eos(), |e| &e.connection_duration)?; Ok(()) }