From b9051f0f457374b6ad16e3550eb1212c0ab49a83 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Tue, 14 Aug 2018 15:11:17 -0700 Subject: [PATCH] Narrow transport metric's API (#65) The `telemetry::metrics::transport` module exposes much of its implementation details to callers, which makes it difficult to make changes to how the module is structured. In preparation for further refactors, this change narrows the module's public API: All labels and scopes types have been made private. A single, public `Transports` type has been introduce to describe the entire public interface of the module. This has been crafted to be free of `event` types and to have minimal external dependencies. A new `transport::Eos` type has been introduced to replace the overloaded `labels::Classification` type -- this type was initially introduced to model _HTTP response_ classification, but it was reused for transports. This is an undesirable coupling that will have to be broken when we start to adddress HTTP response classification properly. --- src/telemetry/metrics/labels.rs | 10 -- src/telemetry/metrics/mod.rs | 41 +---- src/telemetry/metrics/record.rs | 108 ++++------- src/telemetry/metrics/transport.rs | 276 ++++++++++++++++------------- 4 files changed, 197 insertions(+), 238 deletions(-) diff --git a/src/telemetry/metrics/labels.rs b/src/telemetry/metrics/labels.rs index 5c1d2d52e..54cd3251b 100644 --- a/src/telemetry/metrics/labels.rs +++ b/src/telemetry/metrics/labels.rs @@ -9,7 +9,6 @@ use http; use ctx; use conditional::Conditional; -use telemetry::event; use transport::tls; #[derive(Clone, Debug, Eq, PartialEq, Hash)] @@ -192,15 +191,6 @@ impl Classification { grpc_status.map(Classification::grpc_status) .unwrap_or_else(|| Classification::http_status(&rsp.status)) } - - pub fn transport_close(close: &event::TransportClose) -> Self { - if close.clean { - Classification::Success - } else { - Classification::Failure - } - } - } impl fmt::Display for Classification { diff --git a/src/telemetry/metrics/mod.rs b/src/telemetry/metrics/mod.rs index d660c70b5..2d6524f92 100644 --- a/src/telemetry/metrics/mod.rs +++ b/src/telemetry/metrics/mod.rs @@ -51,11 +51,11 @@ use self::labels::{ RequestLabels, ResponseLabels, }; -use self::transport::{TransportLabels, TransportCloseLabels}; pub use self::labels::DstLabels; pub use self::record::Record; pub use self::scopes::Scopes; pub use self::serve::Serve; +pub use self::transport::Transports; use super::process; use super::tls_config_reload; @@ -92,8 +92,7 @@ pub struct Metric<'a, M: FmtMetric> { struct Root { requests: http::RequestScopes, responses: http::ResponseScopes, - transports: transport::OpenScopes, - transport_closes: transport::CloseScopes, + transports: Transports, tls_config_reload: tls_config_reload::Report, process: process::Report, } @@ -171,12 +170,8 @@ impl Root { self.responses.get_or_default(labels).stamped() } - fn transport(&mut self, labels: TransportLabels) -> &mut transport::OpenMetrics { - self.transports.get_or_default(labels) - } - - fn transport_close(&mut self, labels: TransportCloseLabels) -> &mut transport::CloseMetrics { - self.transport_closes.get_or_default(labels) + fn transports(&mut self) -> &mut Transports { + &mut self.transports } fn retain_since(&mut self, epoch: Instant) { @@ -190,7 +185,6 @@ impl fmt::Display for Root { self.requests.fmt(f)?; self.responses.fmt(f)?; self.transports.fmt(f)?; - self.transport_closes.fmt(f)?; self.tls_config_reload.fmt(f)?; self.process.fmt(f)?; @@ -232,7 +226,6 @@ impl ::std::ops::Deref for Stamped { #[cfg(test)] mod tests { use ctx::test_util::*; - use telemetry::event; use super::*; use conditional::Conditional; use tls; @@ -250,22 +243,14 @@ mod tests { let (req, rsp) = request("http://nba.com", &server, &client); let client_transport = Arc::new(ctx::transport::Ctx::Client(client)); - let transport = TransportLabels::new(&client_transport); - root.transport(transport.clone()).open(); + root.transports.open(&client_transport); root.request(RequestLabels::new(&req)).end(); root.response(ResponseLabels::new(&rsp, None)).end(Duration::from_millis(10)); - root.transport(transport).close(100, 200); - let end = TransportCloseLabels::new(&client_transport, &event::TransportClose { - clean: true, - errno: None, - duration: Duration::from_millis(15), - rx_bytes: 40, - tx_bytes: 0, - }); - root.transport_close(end).close(Duration::from_millis(15)); - } + let duration = Duration::from_millis(15); + root.transports().close(&client_transport, transport::Eos::Clean, duration, 100, 200); + } #[test] fn expiry() { @@ -278,7 +263,7 @@ mod tests { let mut root = Root::default(); let t0 = Instant::now(); - root.transport(TransportLabels::new(&server_transport)).open(); + root.transports().open(&server_transport); mock_route(&mut root, &proxy, &server, "warriors"); let t1 = Instant::now(); @@ -288,25 +273,17 @@ mod tests { assert_eq!(root.requests.len(), 2); assert_eq!(root.responses.len(), 2); - assert_eq!(root.transports.len(), 2); - assert_eq!(root.transport_closes.len(), 1); root.retain_since(t0); assert_eq!(root.requests.len(), 2); assert_eq!(root.responses.len(), 2); - assert_eq!(root.transports.len(), 2); - assert_eq!(root.transport_closes.len(), 1); root.retain_since(t1); assert_eq!(root.requests.len(), 1); assert_eq!(root.responses.len(), 1); - assert_eq!(root.transports.len(), 2); - assert_eq!(root.transport_closes.len(), 1); root.retain_since(t2); assert_eq!(root.requests.len(), 0); assert_eq!(root.responses.len(), 0); - assert_eq!(root.transports.len(), 2); - assert_eq!(root.transport_closes.len(), 1); } } diff --git a/src/telemetry/metrics/record.rs b/src/telemetry/metrics/record.rs index 3a97c3f50..1f2a63242 100644 --- a/src/telemetry/metrics/record.rs +++ b/src/telemetry/metrics/record.rs @@ -6,7 +6,7 @@ use super::labels::{ RequestLabels, ResponseLabels, }; -use super::transport::{TransportLabels, TransportCloseLabels}; +use super::transport; /// Tracks Prometheus metrics #[derive(Clone, Debug)] @@ -68,17 +68,21 @@ impl Record { Event::TransportOpen(ref ctx) => { self.update(|metrics| { - metrics.transport(TransportLabels::new(ctx)).open(); + metrics.transports().open(ctx); }) }, Event::TransportClose(ref ctx, ref close) => { + let eos = if close.clean { + transport::Eos::Clean + } else { + transport::Eos::Error { + errno: close.errno.map(|e| e.into()) + } + }; self.update(|metrics| { - metrics.transport(TransportLabels::new(ctx)) - .close(close.rx_bytes, close.tx_bytes); - - metrics.transport_close(TransportCloseLabels::new(ctx, close)) - .close(close.duration); + metrics.transports() + .close(ctx, eos, close.duration, close.rx_bytes, close.tx_bytes); }) }, }; @@ -89,7 +93,7 @@ impl Record { mod test { use telemetry::{ event, - metrics::{self, labels, transport::{TransportLabels, TransportCloseLabels}}, + metrics::{self, labels}, Event, }; use ctx::{self, test_util::*, transport::TlsStatus}; @@ -228,33 +232,16 @@ mod test { let req_labels = RequestLabels::new(&req); let rsp_labels = ResponseLabels::new(&rsp, None); - let srv_open_labels = TransportLabels::new(&server_transport); - let srv_close_labels = TransportCloseLabels::new( - &ctx::transport::Ctx::Server(server.clone()), - &transport_close, - ); - let client_open_labels = TransportLabels::new(&client_transport); - let client_close_labels = TransportCloseLabels::new( - &ctx::transport::Ctx::Client(client.clone()), - &transport_close, - ); assert_eq!(client_tls, req_labels.tls_status().into()); assert_eq!(client_tls, rsp_labels.tls_status().into()); - assert_eq!(client_tls, client_open_labels.tls_status().into()); - assert_eq!(client_tls, client_close_labels.tls_status().into()); - assert_eq!(server_tls, srv_open_labels.tls_status().into()); - assert_eq!(server_tls, srv_close_labels.tls_status().into()); { - let lock = r.metrics.lock() - .expect("lock"); + let mut lock = r.metrics.lock().expect("lock"); assert!(lock.requests.get(&req_labels).is_none()); assert!(lock.responses.get(&rsp_labels).is_none()); - assert!(lock.transports.get(&srv_open_labels).is_none()); - assert!(lock.transports.get(&client_open_labels).is_none()); - assert!(lock.transport_closes.get(&srv_close_labels).is_none()); - assert!(lock.transport_closes.get(&client_close_labels).is_none()); + assert_eq!(lock.transports().open_total(&server_transport), 0); + assert_eq!(lock.transports().open_total(&client_transport), 0); } for e in &events { @@ -262,8 +249,7 @@ mod test { } { - let lock = r.metrics.lock() - .expect("lock"); + let mut lock = r.metrics.lock().expect("lock"); // === request scope ==================================== assert_eq!( @@ -274,55 +260,35 @@ mod test { ); // === response scope =================================== - let response_scope = lock - .responses - .get(&rsp_labels) - .expect("response scope missing"); - assert_eq!(response_scope.total(), 1); + { + let response_scope = lock + .responses + .get(&rsp_labels) + .expect("response scope missing"); + assert_eq!(response_scope.total(), 1); - response_scope.latency() - .assert_bucket_exactly(200, 1) - .assert_gt_exactly(200, 0) - .assert_lt_exactly(200, 0); - - // === server transport open scope ====================== - let srv_transport_scope = lock - .transports - .get(&srv_open_labels) - .expect("server transport scope missing"); - assert_eq!(srv_transport_scope.open_total(), 1); - assert_eq!(srv_transport_scope.write_bytes_total(), 4321); - assert_eq!(srv_transport_scope.read_bytes_total(), 4321); - - // === client transport open scope ====================== - let client_transport_scope = lock - .transports - .get(&client_open_labels) - .expect("client transport scope missing"); - assert_eq!(client_transport_scope.open_total(), 1); - assert_eq!(client_transport_scope.write_bytes_total(), 4321); - assert_eq!(client_transport_scope.read_bytes_total(), 4321); + response_scope.latency() + .assert_bucket_exactly(200, 1) + .assert_gt_exactly(200, 0) + .assert_lt_exactly(200, 0); + } + use super::transport::Eos; let transport_duration: u64 = 30_000 * 1_000; + let t = lock.transports(); - // === server transport close scope ===================== - let srv_transport_close_scope = lock - .transport_closes - .get(&srv_close_labels) - .expect("server transport close scope missing"); - assert_eq!(srv_transport_close_scope.close_total(), 1); - srv_transport_close_scope.connection_duration() + assert_eq!(t.open_total(&server_transport), 1); + assert_eq!(t.rx_tx_bytes_total(&server_transport), (4321, 4321)); + assert_eq!(t.close_total(&server_transport, Eos::Clean), 1); + t.connection_durations(&server_transport, Eos::Clean) .assert_bucket_exactly(transport_duration, 1) .assert_gt_exactly(transport_duration, 0) .assert_lt_exactly(transport_duration, 0); - // === client transport close scope ===================== - let client_transport_close_scope = lock - .transport_closes - .get(&client_close_labels) - .expect("client transport close scope missing"); - assert_eq!(client_transport_close_scope.close_total(), 1); - client_transport_close_scope.connection_duration() + assert_eq!(t.open_total(&client_transport), 1); + assert_eq!(t.rx_tx_bytes_total(&client_transport), (4321, 4321)); + assert_eq!(t.close_total(&server_transport, Eos::Clean), 1); + t.connection_durations(&server_transport, Eos::Clean) .assert_bucket_exactly(transport_duration, 1) .assert_gt_exactly(transport_duration, 0) .assert_lt_exactly(transport_duration, 0); diff --git a/src/telemetry/metrics/transport.rs b/src/telemetry/metrics/transport.rs index 71e75af7b..e4d91a723 100644 --- a/src/telemetry/metrics/transport.rs +++ b/src/telemetry/metrics/transport.rs @@ -3,30 +3,49 @@ use std::time::Duration; use ctx; use super::{ - labels::{Classification, Direction, TlsStatus}, + labels::{Direction, TlsStatus}, latency, Counter, Gauge, Histogram, Scopes, }; -use telemetry::{event, Errno}; +use telemetry::Errno; -pub(super) type OpenScopes = Scopes; +metrics! { + tcp_open_total: Counter { "Total count of opened connections" }, + tcp_open_connections: Gauge { "Number of currently-open connections" }, + tcp_read_bytes_total: Counter { "Total count of bytes read from peers" }, + tcp_write_bytes_total: Counter { "Total count of bytes written to peers" }, + + tcp_close_total: Counter { "Total count of closed connections" }, + tcp_connection_duration_ms: Histogram { "Connection lifetimes" } +} + +/// Holds all transport stats. +/// +/// Implements `fmt::Display` to render prometheus-formatted metrics for all transports. +#[derive(Debug, Default)] +pub struct Transports { + opens: OpenScopes, + closes: CloseScopes, +} + +type OpenScopes = Scopes; #[derive(Debug, Default)] -pub(super) struct OpenMetrics { +struct OpenMetrics { open_total: Counter, open_connections: Gauge, write_bytes_total: Counter, read_bytes_total: Counter, } -pub(super) type CloseScopes = Scopes; +type CloseScopes = Scopes; /// Labels describing a TCP connection #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] -pub struct TransportLabels { +struct TransportLabels { /// Was the transport opened in the inbound or outbound direction? direction: Direction, @@ -37,141 +56,156 @@ pub struct TransportLabels { } #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] -pub enum Peer { Src, Dst } +enum Peer { Src, Dst } + +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] +pub enum Eos { + Clean, + Error { + errno: Option, + }, +} /// Labels describing the end of a TCP connection #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] -pub struct TransportCloseLabels { - /// Labels describing the TCP connection that closed. - pub(super) transport: TransportLabels, - - /// Was the transport closed successfully? - classification: Classification, - - /// If `classification` == `Failure`, this may be set with the - /// OS error number describing the error, if there was one. - /// Otherwise, it should be `None`. - errno: Option, +struct TransportCloseLabels { + transport: TransportLabels, + eos: Eos, } #[derive(Debug, Default)] -pub(super) struct CloseMetrics { +struct CloseMetrics { close_total: Counter, connection_duration: Histogram, } -// ===== impl OpenScopes ===== +// ===== impl Transports ===== -impl OpenScopes { - metrics! { - tcp_open_total: Counter { "Total count of opened connections" }, - tcp_open_connections: Gauge { "Number of currently-open connections" }, - tcp_read_bytes_total: Counter { "Total count of bytes read from peers" }, - tcp_write_bytes_total: Counter { "Total count of bytes written to peers" } +impl Transports { + pub fn open(&mut self, ctx: &ctx::transport::Ctx) { + let k = TransportLabels::new(ctx); + let metrics = self.opens.get_or_default(k); + metrics.open_total.incr(); + metrics.open_connections.incr(); + } + + pub fn close( + &mut self, + ctx: &ctx::transport::Ctx, + eos: Eos, + duration: Duration, + rx: u64, + tx: u64, + ) { + let key = TransportLabels::new(ctx); + + let o = self.opens.get_or_default(key); + o.open_connections.decr(); + o.read_bytes_total += rx; + o.write_bytes_total += tx; + + let k = TransportCloseLabels::new(key, eos); + let c = self.closes.get_or_default(k); + c.close_total.incr(); + c.connection_duration.add(duration); + } + + #[cfg(test)] + pub fn open_total(&self, ctx: &ctx::transport::Ctx) -> u64 { + self.opens + .get(&TransportLabels::new(ctx)) + .map(|m| m.open_total.into()) + .unwrap_or(0) + } + + // #[cfg(test)] + // pub fn open_connections(&self, ctx: &ctx::transport::Ctx) -> u64 { + // self.metrics + // .get(&Key::from(ctx)) + // .map(|m| m.open_connections.into()) + // .unwrap_or(0) + // } + + #[cfg(test)] + pub fn rx_tx_bytes_total(&self, ctx: &ctx::transport::Ctx) -> (u64, u64) { + self.opens + .get(&TransportLabels::new(ctx)) + .map(|m| (m.read_bytes_total.into(), m.write_bytes_total.into())) + .unwrap_or((0, 0)) + } + + #[cfg(test)] + pub fn close_total(&self, ctx: &ctx::transport::Ctx, eos: Eos) -> u64 { + self.closes + .get(&TransportCloseLabels::new(TransportLabels::new(ctx), eos)) + .map(|m| m.close_total.into()) + .unwrap_or(0) + } + + #[cfg(test)] + pub fn connection_durations(&self, ctx: &ctx::transport::Ctx, eos: Eos) -> Histogram { + self.closes + .get(&TransportCloseLabels::new(TransportLabels::new(ctx), eos)) + .map(|m| m.connection_duration.clone()) + .unwrap_or_default() } } +impl fmt::Display for Transports { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.opens.fmt(f)?; + self.closes.fmt(f)?; + + Ok(()) + } +} + +// ===== impl OpenScopes ===== + impl fmt::Display for OpenScopes { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { if self.is_empty() { return Ok(()); } - Self::tcp_open_total.fmt_help(f)?; - Self::tcp_open_total.fmt_scopes(f, self, |s| &s.open_total)?; + tcp_open_total.fmt_help(f)?; + tcp_open_total.fmt_scopes(f, self, |s| &s.open_total)?; - Self::tcp_open_connections.fmt_help(f)?; - Self::tcp_open_connections.fmt_scopes(f, self, |s| &s.open_connections)?; + tcp_open_connections.fmt_help(f)?; + tcp_open_connections.fmt_scopes(f, self, |s| &s.open_connections)?; - Self::tcp_read_bytes_total.fmt_help(f)?; - Self::tcp_read_bytes_total.fmt_scopes(f, self, |s| &s.read_bytes_total)?; + tcp_read_bytes_total.fmt_help(f)?; + tcp_read_bytes_total.fmt_scopes(f, self, |s| &s.read_bytes_total)?; - Self::tcp_write_bytes_total.fmt_help(f)?; - Self::tcp_write_bytes_total.fmt_scopes(f, self, |s| &s.write_bytes_total)?; + tcp_write_bytes_total.fmt_help(f)?; + tcp_write_bytes_total.fmt_scopes(f, self, |s| &s.write_bytes_total)?; Ok(()) } } -// ===== impl OpenMetrics ===== - -impl OpenMetrics { - pub(super) fn open(&mut self) { - self.open_total.incr(); - self.open_connections.incr(); - } - - pub(super) fn close(&mut self, rx: u64, tx: u64) { - self.open_connections.decr(); - self.read_bytes_total += rx; - self.write_bytes_total += tx; - } - - #[cfg(test)] - pub(super) fn open_total(&self) -> u64 { - self.open_total.into() - } - - #[cfg(test)] - pub(super) fn read_bytes_total(&self) -> u64 { - self.read_bytes_total.into() - } - - #[cfg(test)] - pub(super) fn write_bytes_total(&self) -> u64 { - self.write_bytes_total.into() - } -} - // ===== impl CloseScopes ===== -impl CloseScopes { - metrics! { - tcp_close_total: Counter { "Total count of closed connections" }, - tcp_connection_duration_ms: Histogram { "Connection lifetimes" } - } -} - impl fmt::Display for CloseScopes { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { if self.is_empty() { return Ok(()); } - Self::tcp_close_total.fmt_help(f)?; - Self::tcp_close_total.fmt_scopes(f, self, |s| &s.close_total)?; + tcp_close_total.fmt_help(f)?; + tcp_close_total.fmt_scopes(f, self, |s| &s.close_total)?; - Self::tcp_connection_duration_ms.fmt_help(f)?; - Self::tcp_connection_duration_ms.fmt_scopes(f, self, |s| &s.connection_duration)?; + tcp_connection_duration_ms.fmt_help(f)?; + tcp_connection_duration_ms.fmt_scopes(f, self, |s| &s.connection_duration)?; Ok(()) } } -// ===== impl CloseMetrics ===== - -impl CloseMetrics { - pub(super) fn close(&mut self, duration: Duration) { - self.close_total.incr(); - self.connection_duration.add(duration); - } - - #[cfg(test)] - pub(super) fn close_total(&self) -> u64 { - self.close_total.into() - } - - #[cfg(test)] - pub(super) fn connection_duration(&self) -> &Histogram { - &self.connection_duration - } -} - - // ===== impl TransportLabels ===== impl TransportLabels { - pub fn new(ctx: &ctx::transport::Ctx) -> Self { + fn new(ctx: &ctx::transport::Ctx) -> Self { TransportLabels { direction: Direction::from_context(ctx.proxy().as_ref()), peer: match *ctx { @@ -181,11 +215,6 @@ impl TransportLabels { tls_status: ctx.tls_status().into(), } } - - #[cfg(test)] - pub fn tls_status(&self) -> TlsStatus { - self.tls_status - } } impl fmt::Display for TransportLabels { @@ -206,36 +235,33 @@ impl fmt::Display for Peer { // ===== impl TransportCloseLabels ===== impl TransportCloseLabels { - pub fn new(ctx: &ctx::transport::Ctx, - close: &event::TransportClose) - -> Self { - let classification = Classification::transport_close(close); - let errno = close.errno.map(|code| { - // If the error code is set, this should be classified - // as a failure! - debug_assert!(classification == Classification::Failure); - Errno::from(code) - }); - TransportCloseLabels { - transport: TransportLabels::new(ctx), - classification, - errno, + fn new(transport: TransportLabels, eos: Eos) -> Self { + Self { + transport, + eos, } } - - #[cfg(test)] - pub fn tls_status(&self) -> TlsStatus { - self.transport.tls_status() - } } impl fmt::Display for TransportCloseLabels { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{},{}", self.transport, self.classification)?; - if let Some(errno) = self.errno { - write!(f, ",errno=\"{}\"", errno)?; - } - Ok(()) + write!(f, "{},{}", self.transport, self.eos) } } +// ===== impl Eos ===== + +impl fmt::Display for Eos { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match &self { + Eos::Clean => f.pad("classification=\"success\""), + Eos::Error { errno } => { + f.pad("classification=\"failure\"")?; + if let Some(e) = errno { + write!(f, ",errno=\"{}\"", e)?; + } + Ok(()) + } + } + } +}