diff --git a/src/telemetry/metrics/transport.rs b/src/telemetry/metrics/transport.rs index e4d91a723..2369b9d81 100644 --- a/src/telemetry/metrics/transport.rs +++ b/src/telemetry/metrics/transport.rs @@ -1,3 +1,4 @@ +use indexmap::IndexMap; use std::fmt; use std::time::Duration; @@ -8,7 +9,6 @@ use super::{ Counter, Gauge, Histogram, - Scopes, }; use telemetry::Errno; @@ -27,32 +27,28 @@ metrics! { /// Implements `fmt::Display` to render prometheus-formatted metrics for all transports. #[derive(Debug, Default)] pub struct Transports { - opens: OpenScopes, - closes: CloseScopes, + metrics: IndexMap, } -type OpenScopes = Scopes; +/// Describes the dimensions across which transport metrics are aggregated. +/// +/// Implements `fmt::Display` to render a comma-separated list of key-value pairs. +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] +struct Key { + direction: Direction, + peer: Peer, + tls_status: TlsStatus, +} +/// Holds all of the metrics for a class of transport. #[derive(Debug, Default)] -struct OpenMetrics { +struct Metrics { open_total: Counter, open_connections: Gauge, write_bytes_total: Counter, read_bytes_total: Counter, -} -type CloseScopes = Scopes; - -/// Labels describing a TCP connection -#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] -struct TransportLabels { - /// Was the transport opened in the inbound or outbound direction? - direction: Direction, - - peer: Peer, - - /// Was the transport secured with TLS? - tls_status: TlsStatus, + by_eos: IndexMap, } #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] @@ -66,25 +62,20 @@ pub enum Eos { }, } -/// Labels describing the end of a TCP connection -#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] -struct TransportCloseLabels { - transport: TransportLabels, - eos: Eos, -} - +/// Holds metrics for a class of end-of-stream. #[derive(Debug, Default)] -struct CloseMetrics { +struct EosMetrics { close_total: Counter, connection_duration: Histogram, } +struct FmtEos<'a>(&'a Key, &'a Eos); + // ===== impl Transports ===== impl Transports { pub fn open(&mut self, ctx: &ctx::transport::Ctx) { - let k = TransportLabels::new(ctx); - let metrics = self.opens.get_or_default(k); + let metrics = self.metrics.entry(Key::new(ctx)).or_insert_with(|| Default::default()); metrics.open_total.incr(); metrics.open_connections.incr(); } @@ -97,23 +88,31 @@ impl Transports { rx: u64, tx: u64, ) { - let key = TransportLabels::new(ctx); + let key = Key::new(ctx); - let o = self.opens.get_or_default(key); - o.open_connections.decr(); - o.read_bytes_total += rx; - o.write_bytes_total += tx; + let metrics = self.metrics.entry(key).or_insert_with(|| Default::default()); + metrics.open_connections.decr(); + metrics.read_bytes_total += rx; + metrics.write_bytes_total += tx; - let k = TransportCloseLabels::new(key, eos); - let c = self.closes.get_or_default(k); - c.close_total.incr(); - c.connection_duration.add(duration); + let class = metrics.by_eos.entry(eos).or_insert_with(|| Default::default()); + class.close_total.incr(); + class.connection_duration.add(duration); + } + + /// Iterates over all end-of-stream metrics. + fn iter_eos(&self) -> impl Iterator { + self.metrics + .iter() + .flat_map(|(k, t)| { + t.by_eos.iter().map(move |(e, m)| (FmtEos(k ,e), m)) + }) } #[cfg(test)] pub fn open_total(&self, ctx: &ctx::transport::Ctx) -> u64 { - self.opens - .get(&TransportLabels::new(ctx)) + self.metrics + .get(&Key::new(ctx)) .map(|m| m.open_total.into()) .unwrap_or(0) } @@ -121,92 +120,75 @@ impl Transports { // #[cfg(test)] // pub fn open_connections(&self, ctx: &ctx::transport::Ctx) -> u64 { // self.metrics - // .get(&Key::from(ctx)) + // .get(&Key::new(ctx)) // .map(|m| m.open_connections.into()) // .unwrap_or(0) // } #[cfg(test)] pub fn rx_tx_bytes_total(&self, ctx: &ctx::transport::Ctx) -> (u64, u64) { - self.opens - .get(&TransportLabels::new(ctx)) + self.metrics + .get(&Key::new(ctx)) .map(|m| (m.read_bytes_total.into(), m.write_bytes_total.into())) .unwrap_or((0, 0)) } #[cfg(test)] pub fn close_total(&self, ctx: &ctx::transport::Ctx, eos: Eos) -> u64 { - self.closes - .get(&TransportCloseLabels::new(TransportLabels::new(ctx), eos)) - .map(|m| m.close_total.into()) + self.metrics + .get(&Key::new(ctx)) + .and_then(move |m| m.by_eos.get(&eos).map(|m| m.close_total.into())) .unwrap_or(0) } #[cfg(test)] pub fn connection_durations(&self, ctx: &ctx::transport::Ctx, eos: Eos) -> Histogram { - self.closes - .get(&TransportCloseLabels::new(TransportLabels::new(ctx), eos)) - .map(|m| m.connection_duration.clone()) + self.metrics + .get(&Key::new(ctx)) + .and_then(move |m| m.by_eos.get(&eos).map(|m| m.connection_duration.clone())) .unwrap_or_default() } } impl fmt::Display for Transports { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - self.opens.fmt(f)?; - self.closes.fmt(f)?; - - Ok(()) - } -} - -// ===== impl OpenScopes ===== - -impl fmt::Display for OpenScopes { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - if self.is_empty() { + if self.metrics.is_empty() { return Ok(()); } tcp_open_total.fmt_help(f)?; - tcp_open_total.fmt_scopes(f, self, |s| &s.open_total)?; + tcp_open_total.fmt_scopes(f, &self.metrics, |m| &m.open_total)?; tcp_open_connections.fmt_help(f)?; - tcp_open_connections.fmt_scopes(f, self, |s| &s.open_connections)?; + tcp_open_connections.fmt_scopes(f, &self.metrics, |m| &m.open_connections)?; tcp_read_bytes_total.fmt_help(f)?; - tcp_read_bytes_total.fmt_scopes(f, self, |s| &s.read_bytes_total)?; + tcp_read_bytes_total.fmt_scopes(f, &self.metrics, |m| &m.read_bytes_total)?; tcp_write_bytes_total.fmt_help(f)?; - tcp_write_bytes_total.fmt_scopes(f, self, |s| &s.write_bytes_total)?; - - Ok(()) - } -} - -// ===== impl CloseScopes ===== - -impl fmt::Display for CloseScopes { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - if self.is_empty() { - return Ok(()); - } + tcp_write_bytes_total.fmt_scopes(f, &self.metrics, |m| &m.write_bytes_total)?; tcp_close_total.fmt_help(f)?; - tcp_close_total.fmt_scopes(f, self, |s| &s.close_total)?; + tcp_close_total.fmt_scopes(f, self.iter_eos(), |e| &e.close_total)?; tcp_connection_duration_ms.fmt_help(f)?; - tcp_connection_duration_ms.fmt_scopes(f, self, |s| &s.connection_duration)?; + tcp_connection_duration_ms.fmt_scopes(f, self.iter_eos(), |e| &e.connection_duration)?; Ok(()) } } -// ===== impl TransportLabels ===== +// ===== impl Key ===== -impl TransportLabels { +impl fmt::Display for Key { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{},{},{}", self.direction, self.peer, self.tls_status) + } +} + +impl Key { fn new(ctx: &ctx::transport::Ctx) -> Self { - TransportLabels { + Self { direction: Direction::from_context(ctx.proxy().as_ref()), peer: match *ctx { ctx::transport::Ctx::Server(_) => Peer::Src, @@ -217,12 +199,6 @@ impl TransportLabels { } } -impl fmt::Display for TransportLabels { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{},{},{}", self.direction, self.peer, self.tls_status) - } -} - impl fmt::Display for Peer { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { @@ -232,22 +208,6 @@ impl fmt::Display for Peer { } } -// ===== impl TransportCloseLabels ===== - -impl TransportCloseLabels { - fn new(transport: TransportLabels, eos: Eos) -> Self { - Self { - transport, - eos, - } - } -} - -impl fmt::Display for TransportCloseLabels { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{},{}", self.transport, self.eos) - } -} // ===== impl Eos ===== @@ -265,3 +225,11 @@ impl fmt::Display for Eos { } } } + +// ===== impl FmtEos ===== + +impl<'a> fmt::Display for FmtEos<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{},{}", self.0, self.1) + } +}