diff --git a/src/telemetry/metrics/labels.rs b/src/telemetry/metrics/labels.rs index 5c1d2d52e..54cd3251b 100644 --- a/src/telemetry/metrics/labels.rs +++ b/src/telemetry/metrics/labels.rs @@ -9,7 +9,6 @@ use http; use ctx; use conditional::Conditional; -use telemetry::event; use transport::tls; #[derive(Clone, Debug, Eq, PartialEq, Hash)] @@ -192,15 +191,6 @@ impl Classification { grpc_status.map(Classification::grpc_status) .unwrap_or_else(|| Classification::http_status(&rsp.status)) } - - pub fn transport_close(close: &event::TransportClose) -> Self { - if close.clean { - Classification::Success - } else { - Classification::Failure - } - } - } impl fmt::Display for Classification { diff --git a/src/telemetry/metrics/mod.rs b/src/telemetry/metrics/mod.rs index d660c70b5..2d6524f92 100644 --- a/src/telemetry/metrics/mod.rs +++ b/src/telemetry/metrics/mod.rs @@ -51,11 +51,11 @@ use self::labels::{ RequestLabels, ResponseLabels, }; -use self::transport::{TransportLabels, TransportCloseLabels}; pub use self::labels::DstLabels; pub use self::record::Record; pub use self::scopes::Scopes; pub use self::serve::Serve; +pub use self::transport::Transports; use super::process; use super::tls_config_reload; @@ -92,8 +92,7 @@ pub struct Metric<'a, M: FmtMetric> { struct Root { requests: http::RequestScopes, responses: http::ResponseScopes, - transports: transport::OpenScopes, - transport_closes: transport::CloseScopes, + transports: Transports, tls_config_reload: tls_config_reload::Report, process: process::Report, } @@ -171,12 +170,8 @@ impl Root { self.responses.get_or_default(labels).stamped() } - fn transport(&mut self, labels: TransportLabels) -> &mut transport::OpenMetrics { - self.transports.get_or_default(labels) - } - - fn transport_close(&mut self, labels: TransportCloseLabels) -> &mut transport::CloseMetrics { - self.transport_closes.get_or_default(labels) + fn transports(&mut self) -> &mut Transports { + &mut self.transports } fn retain_since(&mut self, epoch: Instant) { @@ -190,7 +185,6 @@ impl fmt::Display for Root { self.requests.fmt(f)?; self.responses.fmt(f)?; self.transports.fmt(f)?; - self.transport_closes.fmt(f)?; self.tls_config_reload.fmt(f)?; self.process.fmt(f)?; @@ -232,7 +226,6 @@ impl ::std::ops::Deref for Stamped { #[cfg(test)] mod tests { use ctx::test_util::*; - use telemetry::event; use super::*; use conditional::Conditional; use tls; @@ -250,22 +243,14 @@ mod tests { let (req, rsp) = request("http://nba.com", &server, &client); let client_transport = Arc::new(ctx::transport::Ctx::Client(client)); - let transport = TransportLabels::new(&client_transport); - root.transport(transport.clone()).open(); + root.transports.open(&client_transport); root.request(RequestLabels::new(&req)).end(); root.response(ResponseLabels::new(&rsp, None)).end(Duration::from_millis(10)); - root.transport(transport).close(100, 200); - let end = TransportCloseLabels::new(&client_transport, &event::TransportClose { - clean: true, - errno: None, - duration: Duration::from_millis(15), - rx_bytes: 40, - tx_bytes: 0, - }); - root.transport_close(end).close(Duration::from_millis(15)); - } + let duration = Duration::from_millis(15); + root.transports().close(&client_transport, transport::Eos::Clean, duration, 100, 200); + } #[test] fn expiry() { @@ -278,7 +263,7 @@ mod tests { let mut root = Root::default(); let t0 = Instant::now(); - root.transport(TransportLabels::new(&server_transport)).open(); + root.transports().open(&server_transport); mock_route(&mut root, &proxy, &server, "warriors"); let t1 = Instant::now(); @@ -288,25 +273,17 @@ mod tests { assert_eq!(root.requests.len(), 2); assert_eq!(root.responses.len(), 2); - assert_eq!(root.transports.len(), 2); - assert_eq!(root.transport_closes.len(), 1); root.retain_since(t0); assert_eq!(root.requests.len(), 2); assert_eq!(root.responses.len(), 2); - assert_eq!(root.transports.len(), 2); - assert_eq!(root.transport_closes.len(), 1); root.retain_since(t1); assert_eq!(root.requests.len(), 1); assert_eq!(root.responses.len(), 1); - assert_eq!(root.transports.len(), 2); - assert_eq!(root.transport_closes.len(), 1); root.retain_since(t2); assert_eq!(root.requests.len(), 0); assert_eq!(root.responses.len(), 0); - assert_eq!(root.transports.len(), 2); - assert_eq!(root.transport_closes.len(), 1); } } diff --git a/src/telemetry/metrics/record.rs b/src/telemetry/metrics/record.rs index 3a97c3f50..1f2a63242 100644 --- a/src/telemetry/metrics/record.rs +++ b/src/telemetry/metrics/record.rs @@ -6,7 +6,7 @@ use super::labels::{ RequestLabels, ResponseLabels, }; -use super::transport::{TransportLabels, TransportCloseLabels}; +use super::transport; /// Tracks Prometheus metrics #[derive(Clone, Debug)] @@ -68,17 +68,21 @@ impl Record { Event::TransportOpen(ref ctx) => { self.update(|metrics| { - metrics.transport(TransportLabels::new(ctx)).open(); + metrics.transports().open(ctx); }) }, Event::TransportClose(ref ctx, ref close) => { + let eos = if close.clean { + transport::Eos::Clean + } else { + transport::Eos::Error { + errno: close.errno.map(|e| e.into()) + } + }; self.update(|metrics| { - metrics.transport(TransportLabels::new(ctx)) - .close(close.rx_bytes, close.tx_bytes); - - metrics.transport_close(TransportCloseLabels::new(ctx, close)) - .close(close.duration); + metrics.transports() + .close(ctx, eos, close.duration, close.rx_bytes, close.tx_bytes); }) }, }; @@ -89,7 +93,7 @@ impl Record { mod test { use telemetry::{ event, - metrics::{self, labels, transport::{TransportLabels, TransportCloseLabels}}, + metrics::{self, labels}, Event, }; use ctx::{self, test_util::*, transport::TlsStatus}; @@ -228,33 +232,16 @@ mod test { let req_labels = RequestLabels::new(&req); let rsp_labels = ResponseLabels::new(&rsp, None); - let srv_open_labels = TransportLabels::new(&server_transport); - let srv_close_labels = TransportCloseLabels::new( - &ctx::transport::Ctx::Server(server.clone()), - &transport_close, - ); - let client_open_labels = TransportLabels::new(&client_transport); - let client_close_labels = TransportCloseLabels::new( - &ctx::transport::Ctx::Client(client.clone()), - &transport_close, - ); assert_eq!(client_tls, req_labels.tls_status().into()); assert_eq!(client_tls, rsp_labels.tls_status().into()); - assert_eq!(client_tls, client_open_labels.tls_status().into()); - assert_eq!(client_tls, client_close_labels.tls_status().into()); - assert_eq!(server_tls, srv_open_labels.tls_status().into()); - assert_eq!(server_tls, srv_close_labels.tls_status().into()); { - let lock = r.metrics.lock() - .expect("lock"); + let mut lock = r.metrics.lock().expect("lock"); assert!(lock.requests.get(&req_labels).is_none()); assert!(lock.responses.get(&rsp_labels).is_none()); - assert!(lock.transports.get(&srv_open_labels).is_none()); - assert!(lock.transports.get(&client_open_labels).is_none()); - assert!(lock.transport_closes.get(&srv_close_labels).is_none()); - assert!(lock.transport_closes.get(&client_close_labels).is_none()); + assert_eq!(lock.transports().open_total(&server_transport), 0); + assert_eq!(lock.transports().open_total(&client_transport), 0); } for e in &events { @@ -262,8 +249,7 @@ mod test { } { - let lock = r.metrics.lock() - .expect("lock"); + let mut lock = r.metrics.lock().expect("lock"); // === request scope ==================================== assert_eq!( @@ -274,55 +260,35 @@ mod test { ); // === response scope =================================== - let response_scope = lock - .responses - .get(&rsp_labels) - .expect("response scope missing"); - assert_eq!(response_scope.total(), 1); + { + let response_scope = lock + .responses + .get(&rsp_labels) + .expect("response scope missing"); + assert_eq!(response_scope.total(), 1); - response_scope.latency() - .assert_bucket_exactly(200, 1) - .assert_gt_exactly(200, 0) - .assert_lt_exactly(200, 0); - - // === server transport open scope ====================== - let srv_transport_scope = lock - .transports - .get(&srv_open_labels) - .expect("server transport scope missing"); - assert_eq!(srv_transport_scope.open_total(), 1); - assert_eq!(srv_transport_scope.write_bytes_total(), 4321); - assert_eq!(srv_transport_scope.read_bytes_total(), 4321); - - // === client transport open scope ====================== - let client_transport_scope = lock - .transports - .get(&client_open_labels) - .expect("client transport scope missing"); - assert_eq!(client_transport_scope.open_total(), 1); - assert_eq!(client_transport_scope.write_bytes_total(), 4321); - assert_eq!(client_transport_scope.read_bytes_total(), 4321); + response_scope.latency() + .assert_bucket_exactly(200, 1) + .assert_gt_exactly(200, 0) + .assert_lt_exactly(200, 0); + } + use super::transport::Eos; let transport_duration: u64 = 30_000 * 1_000; + let t = lock.transports(); - // === server transport close scope ===================== - let srv_transport_close_scope = lock - .transport_closes - .get(&srv_close_labels) - .expect("server transport close scope missing"); - assert_eq!(srv_transport_close_scope.close_total(), 1); - srv_transport_close_scope.connection_duration() + assert_eq!(t.open_total(&server_transport), 1); + assert_eq!(t.rx_tx_bytes_total(&server_transport), (4321, 4321)); + assert_eq!(t.close_total(&server_transport, Eos::Clean), 1); + t.connection_durations(&server_transport, Eos::Clean) .assert_bucket_exactly(transport_duration, 1) .assert_gt_exactly(transport_duration, 0) .assert_lt_exactly(transport_duration, 0); - // === client transport close scope ===================== - let client_transport_close_scope = lock - .transport_closes - .get(&client_close_labels) - .expect("client transport close scope missing"); - assert_eq!(client_transport_close_scope.close_total(), 1); - client_transport_close_scope.connection_duration() + assert_eq!(t.open_total(&client_transport), 1); + assert_eq!(t.rx_tx_bytes_total(&client_transport), (4321, 4321)); + assert_eq!(t.close_total(&server_transport, Eos::Clean), 1); + t.connection_durations(&server_transport, Eos::Clean) .assert_bucket_exactly(transport_duration, 1) .assert_gt_exactly(transport_duration, 0) .assert_lt_exactly(transport_duration, 0); diff --git a/src/telemetry/metrics/transport.rs b/src/telemetry/metrics/transport.rs index 71e75af7b..e4d91a723 100644 --- a/src/telemetry/metrics/transport.rs +++ b/src/telemetry/metrics/transport.rs @@ -3,30 +3,49 @@ use std::time::Duration; use ctx; use super::{ - labels::{Classification, Direction, TlsStatus}, + labels::{Direction, TlsStatus}, latency, Counter, Gauge, Histogram, Scopes, }; -use telemetry::{event, Errno}; +use telemetry::Errno; -pub(super) type OpenScopes = Scopes; +metrics! { + tcp_open_total: Counter { "Total count of opened connections" }, + tcp_open_connections: Gauge { "Number of currently-open connections" }, + tcp_read_bytes_total: Counter { "Total count of bytes read from peers" }, + tcp_write_bytes_total: Counter { "Total count of bytes written to peers" }, + + tcp_close_total: Counter { "Total count of closed connections" }, + tcp_connection_duration_ms: Histogram { "Connection lifetimes" } +} + +/// Holds all transport stats. +/// +/// Implements `fmt::Display` to render prometheus-formatted metrics for all transports. +#[derive(Debug, Default)] +pub struct Transports { + opens: OpenScopes, + closes: CloseScopes, +} + +type OpenScopes = Scopes; #[derive(Debug, Default)] -pub(super) struct OpenMetrics { +struct OpenMetrics { open_total: Counter, open_connections: Gauge, write_bytes_total: Counter, read_bytes_total: Counter, } -pub(super) type CloseScopes = Scopes; +type CloseScopes = Scopes; /// Labels describing a TCP connection #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] -pub struct TransportLabels { +struct TransportLabels { /// Was the transport opened in the inbound or outbound direction? direction: Direction, @@ -37,141 +56,156 @@ pub struct TransportLabels { } #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] -pub enum Peer { Src, Dst } +enum Peer { Src, Dst } + +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] +pub enum Eos { + Clean, + Error { + errno: Option, + }, +} /// Labels describing the end of a TCP connection #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] -pub struct TransportCloseLabels { - /// Labels describing the TCP connection that closed. - pub(super) transport: TransportLabels, - - /// Was the transport closed successfully? - classification: Classification, - - /// If `classification` == `Failure`, this may be set with the - /// OS error number describing the error, if there was one. - /// Otherwise, it should be `None`. - errno: Option, +struct TransportCloseLabels { + transport: TransportLabels, + eos: Eos, } #[derive(Debug, Default)] -pub(super) struct CloseMetrics { +struct CloseMetrics { close_total: Counter, connection_duration: Histogram, } -// ===== impl OpenScopes ===== +// ===== impl Transports ===== -impl OpenScopes { - metrics! { - tcp_open_total: Counter { "Total count of opened connections" }, - tcp_open_connections: Gauge { "Number of currently-open connections" }, - tcp_read_bytes_total: Counter { "Total count of bytes read from peers" }, - tcp_write_bytes_total: Counter { "Total count of bytes written to peers" } +impl Transports { + pub fn open(&mut self, ctx: &ctx::transport::Ctx) { + let k = TransportLabels::new(ctx); + let metrics = self.opens.get_or_default(k); + metrics.open_total.incr(); + metrics.open_connections.incr(); + } + + pub fn close( + &mut self, + ctx: &ctx::transport::Ctx, + eos: Eos, + duration: Duration, + rx: u64, + tx: u64, + ) { + let key = TransportLabels::new(ctx); + + let o = self.opens.get_or_default(key); + o.open_connections.decr(); + o.read_bytes_total += rx; + o.write_bytes_total += tx; + + let k = TransportCloseLabels::new(key, eos); + let c = self.closes.get_or_default(k); + c.close_total.incr(); + c.connection_duration.add(duration); + } + + #[cfg(test)] + pub fn open_total(&self, ctx: &ctx::transport::Ctx) -> u64 { + self.opens + .get(&TransportLabels::new(ctx)) + .map(|m| m.open_total.into()) + .unwrap_or(0) + } + + // #[cfg(test)] + // pub fn open_connections(&self, ctx: &ctx::transport::Ctx) -> u64 { + // self.metrics + // .get(&Key::from(ctx)) + // .map(|m| m.open_connections.into()) + // .unwrap_or(0) + // } + + #[cfg(test)] + pub fn rx_tx_bytes_total(&self, ctx: &ctx::transport::Ctx) -> (u64, u64) { + self.opens + .get(&TransportLabels::new(ctx)) + .map(|m| (m.read_bytes_total.into(), m.write_bytes_total.into())) + .unwrap_or((0, 0)) + } + + #[cfg(test)] + pub fn close_total(&self, ctx: &ctx::transport::Ctx, eos: Eos) -> u64 { + self.closes + .get(&TransportCloseLabels::new(TransportLabels::new(ctx), eos)) + .map(|m| m.close_total.into()) + .unwrap_or(0) + } + + #[cfg(test)] + pub fn connection_durations(&self, ctx: &ctx::transport::Ctx, eos: Eos) -> Histogram { + self.closes + .get(&TransportCloseLabels::new(TransportLabels::new(ctx), eos)) + .map(|m| m.connection_duration.clone()) + .unwrap_or_default() } } +impl fmt::Display for Transports { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.opens.fmt(f)?; + self.closes.fmt(f)?; + + Ok(()) + } +} + +// ===== impl OpenScopes ===== + impl fmt::Display for OpenScopes { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { if self.is_empty() { return Ok(()); } - Self::tcp_open_total.fmt_help(f)?; - Self::tcp_open_total.fmt_scopes(f, self, |s| &s.open_total)?; + tcp_open_total.fmt_help(f)?; + tcp_open_total.fmt_scopes(f, self, |s| &s.open_total)?; - Self::tcp_open_connections.fmt_help(f)?; - Self::tcp_open_connections.fmt_scopes(f, self, |s| &s.open_connections)?; + tcp_open_connections.fmt_help(f)?; + tcp_open_connections.fmt_scopes(f, self, |s| &s.open_connections)?; - Self::tcp_read_bytes_total.fmt_help(f)?; - Self::tcp_read_bytes_total.fmt_scopes(f, self, |s| &s.read_bytes_total)?; + tcp_read_bytes_total.fmt_help(f)?; + tcp_read_bytes_total.fmt_scopes(f, self, |s| &s.read_bytes_total)?; - Self::tcp_write_bytes_total.fmt_help(f)?; - Self::tcp_write_bytes_total.fmt_scopes(f, self, |s| &s.write_bytes_total)?; + tcp_write_bytes_total.fmt_help(f)?; + tcp_write_bytes_total.fmt_scopes(f, self, |s| &s.write_bytes_total)?; Ok(()) } } -// ===== impl OpenMetrics ===== - -impl OpenMetrics { - pub(super) fn open(&mut self) { - self.open_total.incr(); - self.open_connections.incr(); - } - - pub(super) fn close(&mut self, rx: u64, tx: u64) { - self.open_connections.decr(); - self.read_bytes_total += rx; - self.write_bytes_total += tx; - } - - #[cfg(test)] - pub(super) fn open_total(&self) -> u64 { - self.open_total.into() - } - - #[cfg(test)] - pub(super) fn read_bytes_total(&self) -> u64 { - self.read_bytes_total.into() - } - - #[cfg(test)] - pub(super) fn write_bytes_total(&self) -> u64 { - self.write_bytes_total.into() - } -} - // ===== impl CloseScopes ===== -impl CloseScopes { - metrics! { - tcp_close_total: Counter { "Total count of closed connections" }, - tcp_connection_duration_ms: Histogram { "Connection lifetimes" } - } -} - impl fmt::Display for CloseScopes { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { if self.is_empty() { return Ok(()); } - Self::tcp_close_total.fmt_help(f)?; - Self::tcp_close_total.fmt_scopes(f, self, |s| &s.close_total)?; + tcp_close_total.fmt_help(f)?; + tcp_close_total.fmt_scopes(f, self, |s| &s.close_total)?; - Self::tcp_connection_duration_ms.fmt_help(f)?; - Self::tcp_connection_duration_ms.fmt_scopes(f, self, |s| &s.connection_duration)?; + tcp_connection_duration_ms.fmt_help(f)?; + tcp_connection_duration_ms.fmt_scopes(f, self, |s| &s.connection_duration)?; Ok(()) } } -// ===== impl CloseMetrics ===== - -impl CloseMetrics { - pub(super) fn close(&mut self, duration: Duration) { - self.close_total.incr(); - self.connection_duration.add(duration); - } - - #[cfg(test)] - pub(super) fn close_total(&self) -> u64 { - self.close_total.into() - } - - #[cfg(test)] - pub(super) fn connection_duration(&self) -> &Histogram { - &self.connection_duration - } -} - - // ===== impl TransportLabels ===== impl TransportLabels { - pub fn new(ctx: &ctx::transport::Ctx) -> Self { + fn new(ctx: &ctx::transport::Ctx) -> Self { TransportLabels { direction: Direction::from_context(ctx.proxy().as_ref()), peer: match *ctx { @@ -181,11 +215,6 @@ impl TransportLabels { tls_status: ctx.tls_status().into(), } } - - #[cfg(test)] - pub fn tls_status(&self) -> TlsStatus { - self.tls_status - } } impl fmt::Display for TransportLabels { @@ -206,36 +235,33 @@ impl fmt::Display for Peer { // ===== impl TransportCloseLabels ===== impl TransportCloseLabels { - pub fn new(ctx: &ctx::transport::Ctx, - close: &event::TransportClose) - -> Self { - let classification = Classification::transport_close(close); - let errno = close.errno.map(|code| { - // If the error code is set, this should be classified - // as a failure! - debug_assert!(classification == Classification::Failure); - Errno::from(code) - }); - TransportCloseLabels { - transport: TransportLabels::new(ctx), - classification, - errno, + fn new(transport: TransportLabels, eos: Eos) -> Self { + Self { + transport, + eos, } } - - #[cfg(test)] - pub fn tls_status(&self) -> TlsStatus { - self.transport.tls_status() - } } impl fmt::Display for TransportCloseLabels { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{},{}", self.transport, self.classification)?; - if let Some(errno) = self.errno { - write!(f, ",errno=\"{}\"", errno)?; - } - Ok(()) + write!(f, "{},{}", self.transport, self.eos) } } +// ===== impl Eos ===== + +impl fmt::Display for Eos { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match &self { + Eos::Clean => f.pad("classification=\"success\""), + Eos::Error { errno } => { + f.pad("classification=\"failure\"")?; + if let Some(e) = errno { + write!(f, ",errno=\"{}\"", e)?; + } + Ok(()) + } + } + } +}