diff --git a/src/telemetry/http/mod.rs b/src/telemetry/http/mod.rs index 18e7a1a84..333e6ce71 100644 --- a/src/telemetry/http/mod.rs +++ b/src/telemetry/http/mod.rs @@ -1,4 +1,5 @@ use std::fmt; +use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use super::metrics::{ @@ -8,6 +9,7 @@ use super::metrics::{ Histogram, Scopes, }; +use telemetry::tap::Taps; pub mod event; mod labels; @@ -15,18 +17,53 @@ mod record; mod sensors; pub mod service; -pub use self::labels::{RequestLabels, ResponseLabels}; -pub use self::record::Record; +use self::labels::{RequestLabels, ResponseLabels}; +use self::record::Record; pub use self::sensors::Sensors; -pub(super) type RequestScopes = Scopes>; +metrics! { + request_total: Counter { "Total count of HTTP requests." }, + response_total: Counter { "Total count of HTTP responses" }, + response_latency_ms: Histogram { + "Elapsed times between a request's headers being received \ + and its response stream completing" + } +} + +pub fn new(taps: &Arc>) -> (Sensors, Report) { + let inner = Arc::new(Mutex::new(Inner::default())); + let sensors = Sensors::new(Record::new(Registry(inner.clone())), taps); + (sensors, Report(inner)) +} + +/// Updates HTTP metrics. +/// +/// TODO Currently, this is only used by `Record`. Later this, will be made +/// public and `Record` will be obviated. +#[derive(Clone, Debug)] +struct Registry(Arc>); + +/// Reports HTTP metrics for prometheus. +/// +/// TODO retain_since should be done implicitly and should not be part of the +/// public interface. +#[derive(Clone, Debug)] +pub struct Report(Arc>); #[derive(Debug, Default)] -pub(super) struct RequestMetrics { +struct Inner { + requests: RequestScopes, + responses: ResponseScopes, +} + +type RequestScopes = Scopes>; + +#[derive(Debug, Default)] +struct RequestMetrics { total: Counter, } -pub(super) type ResponseScopes = Scopes>; +type ResponseScopes = Scopes>; #[derive(Debug, Default)] pub struct ResponseMetrics { @@ -35,27 +72,69 @@ pub struct ResponseMetrics { } #[derive(Debug)] -pub(super) struct Stamped { +struct Stamped { stamp: Instant, inner: T, } -// ===== impl RequestScopes ===== +// ===== impl Registry ===== -impl RequestScopes { - metrics! { - request_total: Counter { "Total count of HTTP requests." } +impl Registry { + + #[cfg(test)] + fn for_test() -> Self { + Registry(Arc::new(Mutex::new(Inner::default()))) + } + + fn end_request(&mut self, labels: RequestLabels) { + let mut inner = match self.0.lock() { + Err(_) => return, + Ok(lock) => lock, + }; + + inner.requests.get_or_default(labels).stamped().end() + } + + fn end_response(&mut self, labels: ResponseLabels, latency: Duration) { + let mut inner = match self.0.lock() { + Err(_) => return, + Ok(lock) => lock, + }; + + inner.responses.get_or_default(labels).stamped().end(latency) } } -impl FmtMetrics for RequestScopes { +// ===== impl Report ===== + +impl Report { + pub(super) fn retain_since(&mut self, epoch: Instant) { + if let Ok(mut inner) = self.0.lock() { + inner.requests.retain(|_, v| v.stamp >= epoch); + inner.responses.retain(|_, v| v.stamp >= epoch); + } + } +} + +impl FmtMetrics for Report { fn fmt_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result { - if self.is_empty() { - return Ok(()); + let inner = match self.0.lock() { + Err(_) => return Ok(()), + Ok(inner) => inner, + }; + + if !inner.requests.is_empty() { + request_total.fmt_help(f)?; + request_total.fmt_scopes(f, &inner.requests, |s| &s.total)?; } - Self::request_total.fmt_help(f)?; - Self::request_total.fmt_scopes(f, self, |s| &s.total)?; + if !inner.responses.is_empty() { + response_total.fmt_help(f)?; + response_total.fmt_scopes(f, &inner.responses, |s| &s.total)?; + + response_latency_ms.fmt_help(f)?; + response_latency_ms.fmt_scopes(f, &inner.responses, |s| &s.latency)?; + } Ok(()) } @@ -74,34 +153,6 @@ impl RequestMetrics { } } -// ===== impl ResponseScopes ===== - -impl ResponseScopes { - metrics! { - response_total: Counter { "Total count of HTTP responses" }, - response_latency_ms: Histogram { - "Elapsed times between a request's headers being received \ - and its response stream completing" - } - } -} - -impl FmtMetrics for ResponseScopes { - fn fmt_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result { - if self.is_empty() { - return Ok(()); - } - - Self::response_total.fmt_help(f)?; - Self::response_total.fmt_scopes(f, self, |s| &s.total)?; - - Self::response_latency_ms.fmt_help(f)?; - Self::response_latency_ms.fmt_scopes(f, self, |s| &s.latency)?; - - Ok(()) - } -} - // ===== impl ResponseMetrics ===== impl ResponseMetrics { @@ -124,11 +175,7 @@ impl ResponseMetrics { // ===== impl Stamped ===== impl Stamped { - pub fn stamp(&self) -> Instant { - self.stamp - } - - pub fn stamped(&mut self) -> &mut T { + fn stamped(&mut self) -> &mut T { self.stamp = Instant::now(); &mut self.inner } @@ -155,3 +202,75 @@ impl ::std::ops::Deref for Stamped { &self.inner } } + +#[cfg(test)] +mod tests { + use std::sync::{Arc, Mutex}; + + use ctx; + use ctx::test_util::*; + use super::*; + use conditional::Conditional; + use tls; + + const TLS_DISABLED: Conditional<(), tls::ReasonForNoTls> = + Conditional::None(tls::ReasonForNoTls::Disabled); + + fn mock_route( + registry: &mut Registry, + proxy: ctx::Proxy, + server: &Arc, + team: &str + ) { + let client = client(proxy, indexmap!["team".into() => team.into(),], TLS_DISABLED); + let (req, rsp) = request("http://nba.com", &server, &client); + registry.end_request(RequestLabels::new(&req)); + registry.end_response(ResponseLabels::new(&rsp, None), Duration::from_millis(10)); + } + + #[test] + fn expiry() { + let proxy = ctx::Proxy::Outbound; + + let server = server(proxy, TLS_DISABLED); + + let inner = Arc::new(Mutex::new(Inner::default())); + let mut registry = Registry(inner.clone()); + let mut report = Report(inner.clone()); + + let t0 = Instant::now(); + + mock_route(&mut registry, proxy, &server, "warriors"); + let t1 = Instant::now(); + + mock_route(&mut registry, proxy, &server, "sixers"); + let t2 = Instant::now(); + + { + let inner = inner.lock().unwrap(); + assert_eq!(inner.requests.len(), 2); + assert_eq!(inner.responses.len(), 2); + } + + report.retain_since(t0); + { + let inner = inner.lock().unwrap(); + assert_eq!(inner.requests.len(), 2); + assert_eq!(inner.responses.len(), 2); + } + + report.retain_since(t1); + { + let inner = inner.lock().unwrap(); + assert_eq!(inner.requests.len(), 1); + assert_eq!(inner.responses.len(), 1); + } + + report.retain_since(t2); + { + let inner = inner.lock().unwrap(); + assert_eq!(inner.requests.len(), 0); + assert_eq!(inner.responses.len(), 0); + } + } +} diff --git a/src/telemetry/http/record.rs b/src/telemetry/http/record.rs index 54ec99c29..841c8fb8c 100644 --- a/src/telemetry/http/record.rs +++ b/src/telemetry/http/record.rs @@ -1,35 +1,23 @@ -use std::sync::{Arc, Mutex}; - -use telemetry::http::event::Event; -use telemetry::metrics::Root; -use super::labels::{ - RequestLabels, - ResponseLabels, -}; +use super::Registry; +use super::event::Event; +use super::labels::{RequestLabels, ResponseLabels}; /// Tracks Prometheus metrics #[derive(Clone, Debug)] pub struct Record { - metrics: Arc>, + metrics: Registry, } // ===== impl Record ===== impl Record { - pub fn new(metrics: &Arc>) -> Self { - Self { metrics: metrics.clone() } + pub(super) fn new(metrics: Registry) -> Self { + Self { metrics } } #[cfg(test)] pub fn for_test() -> Self { - Self { metrics: Default::default() } - } - - #[inline] - fn update(&mut self, f: F) { - let mut lock = self.metrics.lock() - .expect("metrics lock poisoned"); - f(&mut *lock); + Self { metrics: Registry::for_test() } } /// Observe the given event. @@ -40,34 +28,25 @@ impl Record { Event::StreamRequestOpen(_) => {}, Event::StreamRequestFail(ref req, _) => { - self.update(|metrics| { - metrics.request(RequestLabels::new(req)).end(); - }) + self.metrics.end_request(RequestLabels::new(req)); }, Event::StreamRequestEnd(ref req, _) => { - self.update(|metrics| { - metrics.request(RequestLabels::new(req)).end(); - }) + self.metrics.end_request(RequestLabels::new(req)); }, Event::StreamResponseOpen(_, _) => {}, Event::StreamResponseEnd(ref res, ref end) => { let latency = end.response_first_frame_at - end.request_open_at; - self.update(|metrics| { - metrics.response(ResponseLabels::new(res, end.grpc_status)) - .end(latency); - }); + self.metrics.end_response(ResponseLabels::new(res, end.grpc_status), latency); }, Event::StreamResponseFail(ref res, ref fail) => { // TODO: do we care about the failure's error code here? let first_frame_at = fail.response_first_frame_at.unwrap_or(fail.response_fail_at); let latency = first_frame_at - fail.request_open_at; - self.update(|metrics| { - metrics.response(ResponseLabels::fail(res)).end(latency) - }); + self.metrics.end_response(ResponseLabels::fail(res), latency); }, }; } @@ -75,19 +54,22 @@ impl Record { #[cfg(test)] mod test { - use std::time::{Duration, Instant}; - + use super::*; + use super::super::event; + use super::super::labels::{RequestLabels, ResponseLabels}; use ctx::{self, test_util::*, transport::TlsStatus}; + use std::sync::{Arc, Mutex}; + use std::time::{Duration, Instant}; use conditional::Conditional; - use telemetry::http::{event::{self, Event}, labels}; use tls; const TLS_ENABLED: Conditional<(), tls::ReasonForNoTls> = Conditional::Some(()); const TLS_DISABLED: Conditional<(), tls::ReasonForNoTls> = Conditional::None(tls::ReasonForNoTls::Disabled); - fn new_record() -> super::Record { - super::Record::new(&Default::default()) + fn new_record() -> (Record, Arc>) { + let inner = Arc::new(Mutex::new(super::super::Inner::default())); + (Record::new(Registry(inner.clone())), inner) } fn test_record_response_end_outbound(client_tls: TlsStatus, server_tls: TlsStatus) { @@ -116,23 +98,17 @@ mod test { frames_sent: 0, }; - let mut r = new_record(); + let (mut r, i) = new_record(); let ev = Event::StreamResponseEnd(rsp.clone(), end.clone()); - let labels = labels::ResponseLabels::new(&rsp, None); + let labels = ResponseLabels::new(&rsp, None); assert_eq!(labels.tls_status(), client_tls); - assert!(r.metrics.lock() - .expect("lock") - .responses - .get(&labels) - .is_none() - ); + assert!(i.lock().unwrap().responses.get(&labels).is_none()); r.record_event(&ev); { - let lock = r.metrics.lock() - .expect("lock"); + let lock = i.lock().unwrap(); let scope = lock .responses .get(&labels) @@ -144,12 +120,10 @@ mod test { scope.latency().assert_lt_exactly(200, 0); scope.latency().assert_gt_exactly(200, 0); } - } fn test_record_one_conn_request_outbound(client_tls: TlsStatus, server_tls: TlsStatus) { use self::Event::*; - use self::labels::*; let proxy = ctx::Proxy::Outbound; let server = server(proxy, server_tls); @@ -189,7 +163,7 @@ mod test { }), ]; - let mut r = new_record(); + let (mut r, i) = new_record(); let req_labels = RequestLabels::new(&req); let rsp_labels = ResponseLabels::new(&rsp, None); @@ -198,7 +172,7 @@ mod test { assert_eq!(client_tls, rsp_labels.tls_status()); { - let lock = r.metrics.lock().expect("lock"); + let lock = i.lock().unwrap(); assert!(lock.requests.get(&req_labels).is_none()); assert!(lock.responses.get(&rsp_labels).is_none()); } @@ -208,7 +182,7 @@ mod test { } { - let lock = r.metrics.lock().expect("lock"); + let lock = i.lock().unwrap(); // === request scope ==================================== assert_eq!( diff --git a/src/telemetry/http/sensors.rs b/src/telemetry/http/sensors.rs index 4d7339a3a..d0c80600f 100644 --- a/src/telemetry/http/sensors.rs +++ b/src/telemetry/http/sensors.rs @@ -5,10 +5,10 @@ use tower_service::NewService; use tower_h2::Body; use ctx; -use telemetry::tap; +use telemetry::{http::event, tap}; use transparency::ClientError; -use super::{event, Record}; +use super::record::Record; use super::service::{NewHttp, RequestBody}; #[derive(Clone, Debug)] @@ -42,7 +42,7 @@ impl Handle { } impl Sensors { - pub fn new(metrics: Record, taps: &Arc>) -> Self { + pub(super) fn new(metrics: Record, taps: &Arc>) -> Self { Sensors(Inner { metrics, taps: taps.clone(), diff --git a/src/telemetry/metrics/mod.rs b/src/telemetry/metrics/mod.rs index c12443889..b5ab89dd6 100644 --- a/src/telemetry/metrics/mod.rs +++ b/src/telemetry/metrics/mod.rs @@ -26,10 +26,8 @@ //! labels, we can add new labels or modify the existing ones without having //! to worry about missing commas, double commas, or trailing commas at the //! end of the label set (all of which will make Prometheus angry). -use std::default::Default; use std::fmt; -use std::sync::{Arc, Mutex}; -use std::time::{Duration, Instant}; +use std::time::Instant; mod counter; mod gauge; @@ -48,65 +46,42 @@ pub use self::serve::Serve; use super::{http, process, tls_config_reload, transport}; /// The root scope for all runtime metrics. -#[derive(Debug, Default)] +#[derive(Debug)] pub struct Root { - pub(super) requests: http::RequestScopes, - pub(super) responses: http::ResponseScopes, + http: http::Report, transports: transport::Report, tls_config_reload: tls_config_reload::Report, process: process::Report, } -/// Construct the Prometheus metrics. -/// -/// Returns the `Record` and `Serve` sides. The `Serve` side -/// is a Hyper service which can be used to create the server for the -/// scrape endpoint, while the `Record` side can receive updates to the -/// metrics by calling `record_event`. -pub fn new( - idle_retain: Duration, - process: process::Report, - transport_report: transport::Report, - tls: tls_config_reload::Report -) -> (http::Record, Serve) { - let metrics = Arc::new(Mutex::new(Root::new(process, transport_report, tls))); - (http::Record::new(&metrics), Serve::new(&metrics, idle_retain)) +// ===== impl Root ===== + +impl Root { + pub(super) fn new( + http: http::Report, + transports: transport::Report, + tls_config_reload: tls_config_reload::Report, + process: process::Report, + ) -> Self { + Self { + http, + transports, + tls_config_reload, + process, + } + } + + // TODO this should be moved into `http` + fn retain_since(&mut self, epoch: Instant) { + self.http.retain_since(epoch); + } } // ===== impl Root ===== -impl Root { - fn new( - process: process::Report, - transports: transport::Report, - tls_config_reload: tls_config_reload::Report - ) -> Self { - Self { - process, - transports, - tls_config_reload, - .. Root::default() - } - } - - pub(super) fn request(&mut self, labels: http::RequestLabels) -> &mut http::RequestMetrics { - self.requests.get_or_default(labels).stamped() - } - - pub(super) fn response(&mut self, labels: http::ResponseLabels) -> &mut http::ResponseMetrics { - self.responses.get_or_default(labels).stamped() - } - - fn retain_since(&mut self, epoch: Instant) { - self.requests.retain(|_, v| v.stamp() >= epoch); - self.responses.retain(|_, v| v.stamp() >= epoch); - } -} - impl FmtMetrics for Root { fn fmt_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result { - self.requests.fmt_metrics(f)?; - self.responses.fmt_metrics(f)?; + self.http.fmt_metrics(f)?; self.transports.fmt_metrics(f)?; self.tls_config_reload.fmt_metrics(f)?; self.process.fmt_metrics(f)?; @@ -114,60 +89,3 @@ impl FmtMetrics for Root { Ok(()) } } - - -#[cfg(test)] -mod tests { - use ctx; - use ctx::test_util::*; - use super::*; - use conditional::Conditional; - use tls; - - const TLS_DISABLED: Conditional<(), tls::ReasonForNoTls> = - Conditional::None(tls::ReasonForNoTls::Disabled); - - fn mock_route( - root: &mut Root, - proxy: ctx::Proxy, - server: &Arc, - team: &str - ) { - let client = client(proxy, indexmap!["team".into() => team.into(),], TLS_DISABLED); - let (req, rsp) = request("http://nba.com", &server, &client); - root.request(http::RequestLabels::new(&req)).end(); - root.response(http::ResponseLabels::new(&rsp, None)).end(Duration::from_millis(10)); - } - - #[test] - fn expiry() { - let proxy = ctx::Proxy::Outbound; - - let server = server(proxy, TLS_DISABLED); - - let mut root = Root::default(); - - let t0 = Instant::now(); - - mock_route(&mut root, proxy, &server, "warriors"); - let t1 = Instant::now(); - - mock_route(&mut root, proxy, &server, "sixers"); - let t2 = Instant::now(); - - assert_eq!(root.requests.len(), 2); - assert_eq!(root.responses.len(), 2); - - root.retain_since(t0); - assert_eq!(root.requests.len(), 2); - assert_eq!(root.responses.len(), 2); - - root.retain_since(t1); - assert_eq!(root.requests.len(), 1); - assert_eq!(root.responses.len(), 1); - - root.retain_since(t2); - assert_eq!(root.requests.len(), 0); - assert_eq!(root.responses.len(), 0); - } -} diff --git a/src/telemetry/metrics/serve.rs b/src/telemetry/metrics/serve.rs index 89291eedf..51b7d583f 100644 --- a/src/telemetry/metrics/serve.rs +++ b/src/telemetry/metrics/serve.rs @@ -35,7 +35,7 @@ enum ServeError { // ===== impl Serve ===== impl Serve { - pub(super) fn new(metrics: &Arc>, idle_retain: Duration) -> Self { + pub fn new(metrics: &Arc>, idle_retain: Duration) -> Self { Serve { metrics: metrics.clone(), idle_retain, diff --git a/src/telemetry/mod.rs b/src/telemetry/mod.rs index 2643037ee..ee0ca5fb1 100644 --- a/src/telemetry/mod.rs +++ b/src/telemetry/mod.rs @@ -34,15 +34,16 @@ pub fn new( taps: &Arc>, ) -> (Sensors, transport::Registry, tls_config_reload::Sensor, ServeMetrics) { let process = process::Report::new(start_time); + let (http_sensors, http_report) = http::new(taps); let (transport_registry, transport_report) = transport::new(); let (tls_config_sensor, tls_config_fmt) = tls_config_reload::new(); - let (record, serve) = metrics::new( - metrics_retain_idle, - process, + let report = Arc::new(Mutex::new(metrics::Root::new( + http_report, transport_report, - tls_config_fmt - ); - let s = Sensors::new(record, taps); - (s, transport_registry, tls_config_sensor, serve) + tls_config_fmt, + process, + ))); + let serve = ServeMetrics::new(&report, metrics_retain_idle); + (http_sensors, transport_registry, tls_config_sensor, serve) }