Split transport metrics into read and write halves (#74)

In anticipation of removing Transport-related Event types, we want to
separate the concerns of recording transport metrics updates from
reporting them to the metrics endpoint.

The transport module has been split into `Registry` and `Report` types:
`Registry` is responsible for recording updates, and `Report` is
responsible for rendering metrics.
This commit is contained in:
Oliver Gould 2018-08-20 15:02:50 -07:00 committed by GitHub
parent 6b4b35b083
commit 912b9d3b7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 113 additions and 80 deletions

View File

@ -29,7 +29,7 @@
use std::default::Default; use std::default::Default;
use std::fmt; use std::fmt;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant, SystemTime}; use std::time::{Duration, Instant};
mod counter; mod counter;
mod gauge; mod gauge;
@ -41,7 +41,6 @@ pub mod prom;
mod record; mod record;
mod scopes; mod scopes;
mod serve; mod serve;
mod transport;
pub use self::counter::Counter; pub use self::counter::Counter;
pub use self::gauge::Gauge; pub use self::gauge::Gauge;
@ -54,7 +53,7 @@ pub use self::prom::{FmtMetrics, FmtLabels, FmtMetric};
pub use self::record::Record; pub use self::record::Record;
pub use self::scopes::Scopes; pub use self::scopes::Scopes;
pub use self::serve::Serve; pub use self::serve::Serve;
pub use self::transport::Transports; use super::transport;
use super::process; use super::process;
use super::tls_config_reload; use super::tls_config_reload;
@ -63,7 +62,7 @@ use super::tls_config_reload;
struct Root { struct Root {
requests: http::RequestScopes, requests: http::RequestScopes,
responses: http::ResponseScopes, responses: http::ResponseScopes,
transports: Transports, transports: transport::Report,
tls_config_reload: tls_config_reload::Report, tls_config_reload: tls_config_reload::Report,
process: process::Report, process: process::Report,
} }
@ -80,19 +79,27 @@ struct Stamped<T> {
/// is a Hyper service which can be used to create the server for the /// is a Hyper service which can be used to create the server for the
/// scrape endpoint, while the `Record` side can receive updates to the /// scrape endpoint, while the `Record` side can receive updates to the
/// metrics by calling `record_event`. /// metrics by calling `record_event`.
pub fn new(start_time: SystemTime, idle_retain: Duration, tls: tls_config_reload::Report) pub fn new(
-> (Record, Serve) idle_retain: Duration,
{ process: process::Report,
let metrics = Arc::new(Mutex::new(Root::new(start_time, tls))); tls: tls_config_reload::Report
(Record::new(&metrics), Serve::new(&metrics, idle_retain)) ) -> (Record, Serve) {
let (transports, transports_report) = transport::new();
let metrics = Arc::new(Mutex::new(Root::new(process, transports_report, tls)));
(Record::new(&metrics, transports), Serve::new(&metrics, idle_retain))
} }
// ===== impl Root ===== // ===== impl Root =====
impl Root { impl Root {
pub fn new(start_time: SystemTime, tls_config_reload: tls_config_reload::Report) -> Self { fn new(
process: process::Report,
transports: transport::Report,
tls_config_reload: tls_config_reload::Report
) -> Self {
Self { Self {
process: process::Report::new(start_time), process,
transports,
tls_config_reload, tls_config_reload,
.. Root::default() .. Root::default()
} }
@ -106,10 +113,6 @@ impl Root {
self.responses.get_or_default(labels).stamped() self.responses.get_or_default(labels).stamped()
} }
fn transports(&mut self) -> &mut Transports {
&mut self.transports
}
fn retain_since(&mut self, epoch: Instant) { fn retain_since(&mut self, epoch: Instant) {
self.requests.retain(|_, v| v.stamp >= epoch); self.requests.retain(|_, v| v.stamp >= epoch);
self.responses.retain(|_, v| v.stamp >= epoch); self.responses.retain(|_, v| v.stamp >= epoch);
@ -178,15 +181,8 @@ mod tests {
) { ) {
let client = client(proxy, indexmap!["team".into() => team.into(),], TLS_DISABLED); let client = client(proxy, indexmap!["team".into() => team.into(),], TLS_DISABLED);
let (req, rsp) = request("http://nba.com", &server, &client); let (req, rsp) = request("http://nba.com", &server, &client);
let client_transport = Arc::new(ctx::transport::Ctx::Client(client));
root.transports.open(&client_transport);
root.request(RequestLabels::new(&req)).end(); root.request(RequestLabels::new(&req)).end();
root.response(ResponseLabels::new(&rsp, None)).end(Duration::from_millis(10)); root.response(ResponseLabels::new(&rsp, None)).end(Duration::from_millis(10));
let duration = Duration::from_millis(15);
root.transports().close(&client_transport, transport::Eos::Clean, duration, 100, 200);
} }
#[test] #[test]
@ -194,12 +190,10 @@ mod tests {
let proxy = ctx::Proxy::Outbound; let proxy = ctx::Proxy::Outbound;
let server = server(proxy, TLS_DISABLED); let server = server(proxy, TLS_DISABLED);
let server_transport = Arc::new(ctx::transport::Ctx::Server(server.clone()));
let mut root = Root::default(); let mut root = Root::default();
let t0 = Instant::now(); let t0 = Instant::now();
root.transports().open(&server_transport);
mock_route(&mut root, proxy, &server, "warriors"); mock_route(&mut root, proxy, &server, "warriors");
let t1 = Instant::now(); let t1 = Instant::now();

View File

@ -12,13 +12,14 @@ use super::transport;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Record { pub struct Record {
metrics: Arc<Mutex<Root>>, metrics: Arc<Mutex<Root>>,
transports: transport::Registry,
} }
// ===== impl Record ===== // ===== impl Record =====
impl Record { impl Record {
pub(super) fn new(metrics: &Arc<Mutex<Root>>) -> Self { pub(super) fn new(metrics: &Arc<Mutex<Root>>, transports: transport::Registry) -> Self {
Self { metrics: metrics.clone() } Self { metrics: metrics.clone(), transports }
} }
#[inline] #[inline]
@ -67,9 +68,7 @@ impl Record {
}, },
Event::TransportOpen(ref ctx) => { Event::TransportOpen(ref ctx) => {
self.update(|metrics| { self.transports.open(ctx);
metrics.transports().open(ctx);
})
}, },
Event::TransportClose(ref ctx, ref close) => { Event::TransportClose(ref ctx, ref close) => {
@ -80,10 +79,8 @@ impl Record {
errno: close.errno.map(|e| e.into()) errno: close.errno.map(|e| e.into())
} }
}; };
self.update(|metrics| { self.transports
metrics.transports() .close(ctx, eos, close.duration, close.rx_bytes, close.tx_bytes);
.close(ctx, eos, close.duration, close.rx_bytes, close.tx_bytes);
})
}, },
}; };
} }
@ -97,7 +94,7 @@ mod test {
Event, Event,
}; };
use ctx::{self, test_util::*, transport::TlsStatus}; use ctx::{self, test_util::*, transport::TlsStatus};
use std::time::{Duration, Instant, SystemTime}; use std::time::{Duration, Instant};
use conditional::Conditional; use conditional::Conditional;
use tls; use tls;
@ -131,7 +128,7 @@ mod test {
frames_sent: 0, frames_sent: 0,
}; };
let (mut r, _) = metrics::new(SystemTime::now(), Duration::from_secs(100), Default::default()); let (mut r, _) = metrics::new(Duration::from_secs(100), Default::default(), Default::default());
let ev = Event::StreamResponseEnd(rsp.clone(), end.clone()); let ev = Event::StreamResponseEnd(rsp.clone(), end.clone());
let labels = labels::ResponseLabels::new(&rsp, None); let labels = labels::ResponseLabels::new(&rsp, None);
@ -226,7 +223,7 @@ mod test {
), ),
]; ];
let (mut r, _) = metrics::new(SystemTime::now(), Duration::from_secs(1000), Default::default()); let (mut r, _) = metrics::new(Duration::from_secs(1000), Default::default(), Default::default());
let req_labels = RequestLabels::new(&req); let req_labels = RequestLabels::new(&req);
let rsp_labels = ResponseLabels::new(&rsp, None); let rsp_labels = ResponseLabels::new(&rsp, None);
@ -235,11 +232,11 @@ mod test {
assert_eq!(client_tls, rsp_labels.tls_status()); assert_eq!(client_tls, rsp_labels.tls_status());
{ {
let mut lock = r.metrics.lock().expect("lock"); let lock = r.metrics.lock().expect("lock");
assert!(lock.requests.get(&req_labels).is_none()); assert!(lock.requests.get(&req_labels).is_none());
assert!(lock.responses.get(&rsp_labels).is_none()); assert!(lock.responses.get(&rsp_labels).is_none());
assert_eq!(lock.transports().open_total(&server_transport), 0); assert_eq!(r.transports.open_total(&server_transport), 0);
assert_eq!(lock.transports().open_total(&client_transport), 0); assert_eq!(r.transports.open_total(&client_transport), 0);
} }
for e in &events { for e in &events {
@ -247,7 +244,7 @@ mod test {
} }
{ {
let mut lock = r.metrics.lock().expect("lock"); let lock = r.metrics.lock().expect("lock");
// === request scope ==================================== // === request scope ====================================
assert_eq!( assert_eq!(
@ -273,7 +270,7 @@ mod test {
use super::transport::Eos; use super::transport::Eos;
let transport_duration: u64 = 30_000 * 1_000; let transport_duration: u64 = 30_000 * 1_000;
let t = lock.transports(); let t = r.transports;
assert_eq!(t.open_total(&server_transport), 1); assert_eq!(t.open_total(&server_transport), 1);
assert_eq!(t.rx_tx_bytes_total(&server_transport), (4321, 4321)); assert_eq!(t.rx_tx_bytes_total(&server_transport), (4321, 4321));

View File

@ -22,6 +22,7 @@ mod process;
pub mod sensor; pub mod sensor;
pub mod tap; pub mod tap;
pub mod tls_config_reload; pub mod tls_config_reload;
mod transport;
use self::errno::Errno; use self::errno::Errno;
pub use self::event::Event; pub use self::event::Event;
@ -33,8 +34,10 @@ pub fn new(
metrics_retain_idle: Duration, metrics_retain_idle: Duration,
taps: &Arc<Mutex<tap::Taps>>, taps: &Arc<Mutex<tap::Taps>>,
) -> (Sensors, tls_config_reload::Sensor, ServeMetrics) { ) -> (Sensors, tls_config_reload::Sensor, ServeMetrics) {
let process = process::Report::new(start_time);
let (tls_config_sensor, tls_config_fmt) = tls_config_reload::new(); let (tls_config_sensor, tls_config_fmt) = tls_config_reload::new();
let (metrics_record, metrics_serve) = metrics::new(start_time, metrics_retain_idle, tls_config_fmt);
let s = Sensors::new(metrics_record, taps); let (record, serve) = metrics::new(metrics_retain_idle, process, tls_config_fmt);
(s, tls_config_sensor, metrics_serve) let s = Sensors::new(record, taps);
(s, tls_config_sensor, serve)
} }

View File

@ -1,16 +1,17 @@
use indexmap::IndexMap; use indexmap::IndexMap;
use std::fmt; use std::fmt;
use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
use ctx; use ctx;
use super::{ use telemetry::Errno;
use telemetry::metrics::{
latency, latency,
prom::{FmtLabels, FmtMetrics}, prom::{FmtLabels, FmtMetrics},
Counter, Counter,
Gauge, Gauge,
Histogram, Histogram,
}; };
use telemetry::Errno;
metrics! { metrics! {
tcp_open_total: Counter { "Total count of opened connections" }, tcp_open_total: Counter { "Total count of opened connections" },
@ -22,14 +23,22 @@ metrics! {
tcp_connection_duration_ms: Histogram<latency::Ms> { "Connection lifetimes" } tcp_connection_duration_ms: Histogram<latency::Ms> { "Connection lifetimes" }
} }
/// Holds all transport stats. pub fn new() -> (Registry, Report) {
/// let inner = Arc::new(Mutex::new(Inner::default()));
/// Implements `fmt::Display` to render prometheus-formatted metrics for all transports. (Registry(inner.clone()), Report(inner))
#[derive(Debug, Default)]
pub struct Transports {
metrics: IndexMap<Key, Metrics>,
} }
/// Implements `FmtMetrics` to render prometheus-formatted metrics for all transports.
#[derive(Debug, Default)]
pub struct Report(Arc<Mutex<Inner>>);
/// Supports recording telemetry metrics.
#[derive(Clone, Debug)]
pub struct Registry(Arc<Mutex<Inner>>);
#[derive(Debug, Default)]
struct Inner(IndexMap<Key, Metrics>);
/// Describes the dimensions across which transport metrics are aggregated. /// Describes the dimensions across which transport metrics are aggregated.
/// ///
/// Implements `fmt::Display` to render a comma-separated list of key-value pairs. /// Implements `fmt::Display` to render a comma-separated list of key-value pairs.
@ -69,11 +78,39 @@ struct EosMetrics {
connection_duration: Histogram<latency::Ms>, connection_duration: Histogram<latency::Ms>,
} }
// ===== impl Transports ===== impl Inner {
fn is_empty(&self) -> bool {
self.0.is_empty()
}
impl Transports { /// Iterates over all metrics.
fn iter(&self) -> impl Iterator<Item = (&Key, &Metrics)> {
self.0.iter()
}
/// Iterates over all end-of-stream metrics.
fn iter_eos(&self) -> impl Iterator<Item = ((&Key, &Eos), &EosMetrics)> {
self.iter()
.flat_map(|(k, t)| {
t.by_eos.iter().map(move |(e, m)| ((k ,e), m))
})
}
fn get_or_default(&mut self, k: Key) -> &mut Metrics {
self.0.entry(k).or_insert_with(|| Metrics::default())
}
}
// ===== impl Registry =====
impl Registry {
pub fn open(&mut self, ctx: &ctx::transport::Ctx) { pub fn open(&mut self, ctx: &ctx::transport::Ctx) {
let metrics = self.metrics.entry(Key::new(ctx)).or_insert_with(|| Default::default()); let mut inner = match self.0.lock() {
Err(_) => return,
Ok(lock) => lock,
};
let metrics = inner.get_or_default(Key::new(ctx));
metrics.open_total.incr(); metrics.open_total.incr();
metrics.open_connections.incr(); metrics.open_connections.incr();
} }
@ -86,30 +123,27 @@ impl Transports {
rx: u64, rx: u64,
tx: u64, tx: u64,
) { ) {
let key = Key::new(ctx); let mut inner = match self.0.lock() {
Err(_) => return,
Ok(lock) => lock,
};
let metrics = self.metrics.entry(key).or_insert_with(|| Default::default()); let key = Key::new(ctx);
let metrics = inner.get_or_default(key);
metrics.open_connections.decr(); metrics.open_connections.decr();
metrics.read_bytes_total += rx; metrics.read_bytes_total += rx;
metrics.write_bytes_total += tx; metrics.write_bytes_total += tx;
let class = metrics.by_eos.entry(eos).or_insert_with(|| Default::default()); let class = metrics.by_eos
.entry(eos)
.or_insert_with(|| EosMetrics::default());
class.close_total.incr(); class.close_total.incr();
class.connection_duration.add(duration); class.connection_duration.add(duration);
} }
/// Iterates over all end-of-stream metrics.
fn iter_eos(&self) -> impl Iterator<Item = ((&Key, &Eos), &EosMetrics)> {
self.metrics
.iter()
.flat_map(|(k, t)| {
t.by_eos.iter().map(move |(e, m)| ((k ,e), m))
})
}
#[cfg(test)] #[cfg(test)]
pub fn open_total(&self, ctx: &ctx::transport::Ctx) -> u64 { pub fn open_total(&self, ctx: &ctx::transport::Ctx) -> u64 {
self.metrics self.0.lock().unwrap().0
.get(&Key::new(ctx)) .get(&Key::new(ctx))
.map(|m| m.open_total.into()) .map(|m| m.open_total.into())
.unwrap_or(0) .unwrap_or(0)
@ -117,7 +151,7 @@ impl Transports {
// #[cfg(test)] // #[cfg(test)]
// pub fn open_connections(&self, ctx: &ctx::transport::Ctx) -> u64 { // pub fn open_connections(&self, ctx: &ctx::transport::Ctx) -> u64 {
// self.metrics // self.0.lock().unwrap().0
// .get(&Key::new(ctx)) // .get(&Key::new(ctx))
// .map(|m| m.open_connections.into()) // .map(|m| m.open_connections.into())
// .unwrap_or(0) // .unwrap_or(0)
@ -125,7 +159,7 @@ impl Transports {
#[cfg(test)] #[cfg(test)]
pub fn rx_tx_bytes_total(&self, ctx: &ctx::transport::Ctx) -> (u64, u64) { pub fn rx_tx_bytes_total(&self, ctx: &ctx::transport::Ctx) -> (u64, u64) {
self.metrics self.0.lock().unwrap().0
.get(&Key::new(ctx)) .get(&Key::new(ctx))
.map(|m| (m.read_bytes_total.into(), m.write_bytes_total.into())) .map(|m| (m.read_bytes_total.into(), m.write_bytes_total.into()))
.unwrap_or((0, 0)) .unwrap_or((0, 0))
@ -133,7 +167,7 @@ impl Transports {
#[cfg(test)] #[cfg(test)]
pub fn close_total(&self, ctx: &ctx::transport::Ctx, eos: Eos) -> u64 { pub fn close_total(&self, ctx: &ctx::transport::Ctx, eos: Eos) -> u64 {
self.metrics self.0.lock().unwrap().0
.get(&Key::new(ctx)) .get(&Key::new(ctx))
.and_then(move |m| m.by_eos.get(&eos).map(|m| m.close_total.into())) .and_then(move |m| m.by_eos.get(&eos).map(|m| m.close_total.into()))
.unwrap_or(0) .unwrap_or(0)
@ -141,36 +175,41 @@ impl Transports {
#[cfg(test)] #[cfg(test)]
pub fn connection_durations(&self, ctx: &ctx::transport::Ctx, eos: Eos) -> Histogram<latency::Ms> { pub fn connection_durations(&self, ctx: &ctx::transport::Ctx, eos: Eos) -> Histogram<latency::Ms> {
self.metrics self.0.lock().unwrap().0
.get(&Key::new(ctx)) .get(&Key::new(ctx))
.and_then(move |m| m.by_eos.get(&eos).map(|m| m.connection_duration.clone())) .and_then(move |m| m.by_eos.get(&eos).map(|m| m.connection_duration.clone()))
.unwrap_or_default() .unwrap_or_default()
} }
} }
impl FmtMetrics for Transports { impl FmtMetrics for Report {
fn fmt_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
if self.metrics.is_empty() { let metrics = match self.0.lock() {
Err(_) => return Ok(()),
Ok(lock) => lock,
};
if metrics.is_empty() {
return Ok(()); return Ok(());
} }
tcp_open_total.fmt_help(f)?; tcp_open_total.fmt_help(f)?;
tcp_open_total.fmt_scopes(f, &self.metrics, |m| &m.open_total)?; tcp_open_total.fmt_scopes(f, metrics.iter(), |m| &m.open_total)?;
tcp_open_connections.fmt_help(f)?; tcp_open_connections.fmt_help(f)?;
tcp_open_connections.fmt_scopes(f, &self.metrics, |m| &m.open_connections)?; tcp_open_connections.fmt_scopes(f, metrics.iter(), |m| &m.open_connections)?;
tcp_read_bytes_total.fmt_help(f)?; tcp_read_bytes_total.fmt_help(f)?;
tcp_read_bytes_total.fmt_scopes(f, &self.metrics, |m| &m.read_bytes_total)?; tcp_read_bytes_total.fmt_scopes(f, metrics.iter(), |m| &m.read_bytes_total)?;
tcp_write_bytes_total.fmt_help(f)?; tcp_write_bytes_total.fmt_help(f)?;
tcp_write_bytes_total.fmt_scopes(f, &self.metrics, |m| &m.write_bytes_total)?; tcp_write_bytes_total.fmt_scopes(f, metrics.iter(), |m| &m.write_bytes_total)?;
tcp_close_total.fmt_help(f)?; tcp_close_total.fmt_help(f)?;
tcp_close_total.fmt_scopes(f, self.iter_eos(), |e| &e.close_total)?; tcp_close_total.fmt_scopes(f, metrics.iter_eos(), |e| &e.close_total)?;
tcp_connection_duration_ms.fmt_help(f)?; tcp_connection_duration_ms.fmt_help(f)?;
tcp_connection_duration_ms.fmt_scopes(f, self.iter_eos(), |e| &e.connection_duration)?; tcp_connection_duration_ms.fmt_scopes(f, metrics.iter_eos(), |e| &e.connection_duration)?;
Ok(()) Ok(())
} }