From dfcc0086b8bed82ec964186e56fe81982d9b5650 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Wed, 22 Aug 2018 15:17:11 -0700 Subject: [PATCH] Hide HTTP telemetry implementation details (#80) Previously, much of `telemetry::http`'s types and internal implementation details are exposed to the rest of the telemetry system. In preparation for further changes to support more granular locking, this change makes metric storage and recording implementation details. Following this, `telemetry::http` exposes a `Report` type for printing metrics to the server and a `Sensors` type used to instrument stacks with HTTP telemetry. These types share an internally-mutable metrics registry that is private to the http module. The `event` types continue to be exposed to support Tap, but the convenience exports have been removed. The `metrics::Root` type no longer needs to be shareable. This type will be replaced in a followup change. --- src/telemetry/http/mod.rs | 215 +++++++++++++++++++++++++-------- src/telemetry/http/record.rs | 78 ++++-------- src/telemetry/http/sensors.rs | 6 +- src/telemetry/metrics/mod.rs | 132 ++++---------------- src/telemetry/metrics/serve.rs | 2 +- src/telemetry/mod.rs | 15 +-- 6 files changed, 230 insertions(+), 218 deletions(-) diff --git a/src/telemetry/http/mod.rs b/src/telemetry/http/mod.rs index 18e7a1a84..333e6ce71 100644 --- a/src/telemetry/http/mod.rs +++ b/src/telemetry/http/mod.rs @@ -1,4 +1,5 @@ use std::fmt; +use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use super::metrics::{ @@ -8,6 +9,7 @@ use super::metrics::{ Histogram, Scopes, }; +use telemetry::tap::Taps; pub mod event; mod labels; @@ -15,18 +17,53 @@ mod record; mod sensors; pub mod service; -pub use self::labels::{RequestLabels, ResponseLabels}; -pub use self::record::Record; +use self::labels::{RequestLabels, ResponseLabels}; +use self::record::Record; pub use self::sensors::Sensors; -pub(super) type RequestScopes = Scopes>; +metrics! { + request_total: Counter { "Total count of HTTP requests." }, + response_total: Counter { "Total count of HTTP responses" }, + response_latency_ms: Histogram { + "Elapsed times between a request's headers being received \ + and its response stream completing" + } +} + +pub fn new(taps: &Arc>) -> (Sensors, Report) { + let inner = Arc::new(Mutex::new(Inner::default())); + let sensors = Sensors::new(Record::new(Registry(inner.clone())), taps); + (sensors, Report(inner)) +} + +/// Updates HTTP metrics. +/// +/// TODO Currently, this is only used by `Record`. Later this, will be made +/// public and `Record` will be obviated. +#[derive(Clone, Debug)] +struct Registry(Arc>); + +/// Reports HTTP metrics for prometheus. +/// +/// TODO retain_since should be done implicitly and should not be part of the +/// public interface. +#[derive(Clone, Debug)] +pub struct Report(Arc>); #[derive(Debug, Default)] -pub(super) struct RequestMetrics { +struct Inner { + requests: RequestScopes, + responses: ResponseScopes, +} + +type RequestScopes = Scopes>; + +#[derive(Debug, Default)] +struct RequestMetrics { total: Counter, } -pub(super) type ResponseScopes = Scopes>; +type ResponseScopes = Scopes>; #[derive(Debug, Default)] pub struct ResponseMetrics { @@ -35,27 +72,69 @@ pub struct ResponseMetrics { } #[derive(Debug)] -pub(super) struct Stamped { +struct Stamped { stamp: Instant, inner: T, } -// ===== impl RequestScopes ===== +// ===== impl Registry ===== -impl RequestScopes { - metrics! { - request_total: Counter { "Total count of HTTP requests." } +impl Registry { + + #[cfg(test)] + fn for_test() -> Self { + Registry(Arc::new(Mutex::new(Inner::default()))) + } + + fn end_request(&mut self, labels: RequestLabels) { + let mut inner = match self.0.lock() { + Err(_) => return, + Ok(lock) => lock, + }; + + inner.requests.get_or_default(labels).stamped().end() + } + + fn end_response(&mut self, labels: ResponseLabels, latency: Duration) { + let mut inner = match self.0.lock() { + Err(_) => return, + Ok(lock) => lock, + }; + + inner.responses.get_or_default(labels).stamped().end(latency) } } -impl FmtMetrics for RequestScopes { +// ===== impl Report ===== + +impl Report { + pub(super) fn retain_since(&mut self, epoch: Instant) { + if let Ok(mut inner) = self.0.lock() { + inner.requests.retain(|_, v| v.stamp >= epoch); + inner.responses.retain(|_, v| v.stamp >= epoch); + } + } +} + +impl FmtMetrics for Report { fn fmt_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result { - if self.is_empty() { - return Ok(()); + let inner = match self.0.lock() { + Err(_) => return Ok(()), + Ok(inner) => inner, + }; + + if !inner.requests.is_empty() { + request_total.fmt_help(f)?; + request_total.fmt_scopes(f, &inner.requests, |s| &s.total)?; } - Self::request_total.fmt_help(f)?; - Self::request_total.fmt_scopes(f, self, |s| &s.total)?; + if !inner.responses.is_empty() { + response_total.fmt_help(f)?; + response_total.fmt_scopes(f, &inner.responses, |s| &s.total)?; + + response_latency_ms.fmt_help(f)?; + response_latency_ms.fmt_scopes(f, &inner.responses, |s| &s.latency)?; + } Ok(()) } @@ -74,34 +153,6 @@ impl RequestMetrics { } } -// ===== impl ResponseScopes ===== - -impl ResponseScopes { - metrics! { - response_total: Counter { "Total count of HTTP responses" }, - response_latency_ms: Histogram { - "Elapsed times between a request's headers being received \ - and its response stream completing" - } - } -} - -impl FmtMetrics for ResponseScopes { - fn fmt_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result { - if self.is_empty() { - return Ok(()); - } - - Self::response_total.fmt_help(f)?; - Self::response_total.fmt_scopes(f, self, |s| &s.total)?; - - Self::response_latency_ms.fmt_help(f)?; - Self::response_latency_ms.fmt_scopes(f, self, |s| &s.latency)?; - - Ok(()) - } -} - // ===== impl ResponseMetrics ===== impl ResponseMetrics { @@ -124,11 +175,7 @@ impl ResponseMetrics { // ===== impl Stamped ===== impl Stamped { - pub fn stamp(&self) -> Instant { - self.stamp - } - - pub fn stamped(&mut self) -> &mut T { + fn stamped(&mut self) -> &mut T { self.stamp = Instant::now(); &mut self.inner } @@ -155,3 +202,75 @@ impl ::std::ops::Deref for Stamped { &self.inner } } + +#[cfg(test)] +mod tests { + use std::sync::{Arc, Mutex}; + + use ctx; + use ctx::test_util::*; + use super::*; + use conditional::Conditional; + use tls; + + const TLS_DISABLED: Conditional<(), tls::ReasonForNoTls> = + Conditional::None(tls::ReasonForNoTls::Disabled); + + fn mock_route( + registry: &mut Registry, + proxy: ctx::Proxy, + server: &Arc, + team: &str + ) { + let client = client(proxy, indexmap!["team".into() => team.into(),], TLS_DISABLED); + let (req, rsp) = request("http://nba.com", &server, &client); + registry.end_request(RequestLabels::new(&req)); + registry.end_response(ResponseLabels::new(&rsp, None), Duration::from_millis(10)); + } + + #[test] + fn expiry() { + let proxy = ctx::Proxy::Outbound; + + let server = server(proxy, TLS_DISABLED); + + let inner = Arc::new(Mutex::new(Inner::default())); + let mut registry = Registry(inner.clone()); + let mut report = Report(inner.clone()); + + let t0 = Instant::now(); + + mock_route(&mut registry, proxy, &server, "warriors"); + let t1 = Instant::now(); + + mock_route(&mut registry, proxy, &server, "sixers"); + let t2 = Instant::now(); + + { + let inner = inner.lock().unwrap(); + assert_eq!(inner.requests.len(), 2); + assert_eq!(inner.responses.len(), 2); + } + + report.retain_since(t0); + { + let inner = inner.lock().unwrap(); + assert_eq!(inner.requests.len(), 2); + assert_eq!(inner.responses.len(), 2); + } + + report.retain_since(t1); + { + let inner = inner.lock().unwrap(); + assert_eq!(inner.requests.len(), 1); + assert_eq!(inner.responses.len(), 1); + } + + report.retain_since(t2); + { + let inner = inner.lock().unwrap(); + assert_eq!(inner.requests.len(), 0); + assert_eq!(inner.responses.len(), 0); + } + } +} diff --git a/src/telemetry/http/record.rs b/src/telemetry/http/record.rs index 54ec99c29..841c8fb8c 100644 --- a/src/telemetry/http/record.rs +++ b/src/telemetry/http/record.rs @@ -1,35 +1,23 @@ -use std::sync::{Arc, Mutex}; - -use telemetry::http::event::Event; -use telemetry::metrics::Root; -use super::labels::{ - RequestLabels, - ResponseLabels, -}; +use super::Registry; +use super::event::Event; +use super::labels::{RequestLabels, ResponseLabels}; /// Tracks Prometheus metrics #[derive(Clone, Debug)] pub struct Record { - metrics: Arc>, + metrics: Registry, } // ===== impl Record ===== impl Record { - pub fn new(metrics: &Arc>) -> Self { - Self { metrics: metrics.clone() } + pub(super) fn new(metrics: Registry) -> Self { + Self { metrics } } #[cfg(test)] pub fn for_test() -> Self { - Self { metrics: Default::default() } - } - - #[inline] - fn update(&mut self, f: F) { - let mut lock = self.metrics.lock() - .expect("metrics lock poisoned"); - f(&mut *lock); + Self { metrics: Registry::for_test() } } /// Observe the given event. @@ -40,34 +28,25 @@ impl Record { Event::StreamRequestOpen(_) => {}, Event::StreamRequestFail(ref req, _) => { - self.update(|metrics| { - metrics.request(RequestLabels::new(req)).end(); - }) + self.metrics.end_request(RequestLabels::new(req)); }, Event::StreamRequestEnd(ref req, _) => { - self.update(|metrics| { - metrics.request(RequestLabels::new(req)).end(); - }) + self.metrics.end_request(RequestLabels::new(req)); }, Event::StreamResponseOpen(_, _) => {}, Event::StreamResponseEnd(ref res, ref end) => { let latency = end.response_first_frame_at - end.request_open_at; - self.update(|metrics| { - metrics.response(ResponseLabels::new(res, end.grpc_status)) - .end(latency); - }); + self.metrics.end_response(ResponseLabels::new(res, end.grpc_status), latency); }, Event::StreamResponseFail(ref res, ref fail) => { // TODO: do we care about the failure's error code here? let first_frame_at = fail.response_first_frame_at.unwrap_or(fail.response_fail_at); let latency = first_frame_at - fail.request_open_at; - self.update(|metrics| { - metrics.response(ResponseLabels::fail(res)).end(latency) - }); + self.metrics.end_response(ResponseLabels::fail(res), latency); }, }; } @@ -75,19 +54,22 @@ impl Record { #[cfg(test)] mod test { - use std::time::{Duration, Instant}; - + use super::*; + use super::super::event; + use super::super::labels::{RequestLabels, ResponseLabels}; use ctx::{self, test_util::*, transport::TlsStatus}; + use std::sync::{Arc, Mutex}; + use std::time::{Duration, Instant}; use conditional::Conditional; - use telemetry::http::{event::{self, Event}, labels}; use tls; const TLS_ENABLED: Conditional<(), tls::ReasonForNoTls> = Conditional::Some(()); const TLS_DISABLED: Conditional<(), tls::ReasonForNoTls> = Conditional::None(tls::ReasonForNoTls::Disabled); - fn new_record() -> super::Record { - super::Record::new(&Default::default()) + fn new_record() -> (Record, Arc>) { + let inner = Arc::new(Mutex::new(super::super::Inner::default())); + (Record::new(Registry(inner.clone())), inner) } fn test_record_response_end_outbound(client_tls: TlsStatus, server_tls: TlsStatus) { @@ -116,23 +98,17 @@ mod test { frames_sent: 0, }; - let mut r = new_record(); + let (mut r, i) = new_record(); let ev = Event::StreamResponseEnd(rsp.clone(), end.clone()); - let labels = labels::ResponseLabels::new(&rsp, None); + let labels = ResponseLabels::new(&rsp, None); assert_eq!(labels.tls_status(), client_tls); - assert!(r.metrics.lock() - .expect("lock") - .responses - .get(&labels) - .is_none() - ); + assert!(i.lock().unwrap().responses.get(&labels).is_none()); r.record_event(&ev); { - let lock = r.metrics.lock() - .expect("lock"); + let lock = i.lock().unwrap(); let scope = lock .responses .get(&labels) @@ -144,12 +120,10 @@ mod test { scope.latency().assert_lt_exactly(200, 0); scope.latency().assert_gt_exactly(200, 0); } - } fn test_record_one_conn_request_outbound(client_tls: TlsStatus, server_tls: TlsStatus) { use self::Event::*; - use self::labels::*; let proxy = ctx::Proxy::Outbound; let server = server(proxy, server_tls); @@ -189,7 +163,7 @@ mod test { }), ]; - let mut r = new_record(); + let (mut r, i) = new_record(); let req_labels = RequestLabels::new(&req); let rsp_labels = ResponseLabels::new(&rsp, None); @@ -198,7 +172,7 @@ mod test { assert_eq!(client_tls, rsp_labels.tls_status()); { - let lock = r.metrics.lock().expect("lock"); + let lock = i.lock().unwrap(); assert!(lock.requests.get(&req_labels).is_none()); assert!(lock.responses.get(&rsp_labels).is_none()); } @@ -208,7 +182,7 @@ mod test { } { - let lock = r.metrics.lock().expect("lock"); + let lock = i.lock().unwrap(); // === request scope ==================================== assert_eq!( diff --git a/src/telemetry/http/sensors.rs b/src/telemetry/http/sensors.rs index 4d7339a3a..d0c80600f 100644 --- a/src/telemetry/http/sensors.rs +++ b/src/telemetry/http/sensors.rs @@ -5,10 +5,10 @@ use tower_service::NewService; use tower_h2::Body; use ctx; -use telemetry::tap; +use telemetry::{http::event, tap}; use transparency::ClientError; -use super::{event, Record}; +use super::record::Record; use super::service::{NewHttp, RequestBody}; #[derive(Clone, Debug)] @@ -42,7 +42,7 @@ impl Handle { } impl Sensors { - pub fn new(metrics: Record, taps: &Arc>) -> Self { + pub(super) fn new(metrics: Record, taps: &Arc>) -> Self { Sensors(Inner { metrics, taps: taps.clone(), diff --git a/src/telemetry/metrics/mod.rs b/src/telemetry/metrics/mod.rs index c12443889..b5ab89dd6 100644 --- a/src/telemetry/metrics/mod.rs +++ b/src/telemetry/metrics/mod.rs @@ -26,10 +26,8 @@ //! labels, we can add new labels or modify the existing ones without having //! to worry about missing commas, double commas, or trailing commas at the //! end of the label set (all of which will make Prometheus angry). -use std::default::Default; use std::fmt; -use std::sync::{Arc, Mutex}; -use std::time::{Duration, Instant}; +use std::time::Instant; mod counter; mod gauge; @@ -48,65 +46,42 @@ pub use self::serve::Serve; use super::{http, process, tls_config_reload, transport}; /// The root scope for all runtime metrics. -#[derive(Debug, Default)] +#[derive(Debug)] pub struct Root { - pub(super) requests: http::RequestScopes, - pub(super) responses: http::ResponseScopes, + http: http::Report, transports: transport::Report, tls_config_reload: tls_config_reload::Report, process: process::Report, } -/// Construct the Prometheus metrics. -/// -/// Returns the `Record` and `Serve` sides. The `Serve` side -/// is a Hyper service which can be used to create the server for the -/// scrape endpoint, while the `Record` side can receive updates to the -/// metrics by calling `record_event`. -pub fn new( - idle_retain: Duration, - process: process::Report, - transport_report: transport::Report, - tls: tls_config_reload::Report -) -> (http::Record, Serve) { - let metrics = Arc::new(Mutex::new(Root::new(process, transport_report, tls))); - (http::Record::new(&metrics), Serve::new(&metrics, idle_retain)) +// ===== impl Root ===== + +impl Root { + pub(super) fn new( + http: http::Report, + transports: transport::Report, + tls_config_reload: tls_config_reload::Report, + process: process::Report, + ) -> Self { + Self { + http, + transports, + tls_config_reload, + process, + } + } + + // TODO this should be moved into `http` + fn retain_since(&mut self, epoch: Instant) { + self.http.retain_since(epoch); + } } // ===== impl Root ===== -impl Root { - fn new( - process: process::Report, - transports: transport::Report, - tls_config_reload: tls_config_reload::Report - ) -> Self { - Self { - process, - transports, - tls_config_reload, - .. Root::default() - } - } - - pub(super) fn request(&mut self, labels: http::RequestLabels) -> &mut http::RequestMetrics { - self.requests.get_or_default(labels).stamped() - } - - pub(super) fn response(&mut self, labels: http::ResponseLabels) -> &mut http::ResponseMetrics { - self.responses.get_or_default(labels).stamped() - } - - fn retain_since(&mut self, epoch: Instant) { - self.requests.retain(|_, v| v.stamp() >= epoch); - self.responses.retain(|_, v| v.stamp() >= epoch); - } -} - impl FmtMetrics for Root { fn fmt_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result { - self.requests.fmt_metrics(f)?; - self.responses.fmt_metrics(f)?; + self.http.fmt_metrics(f)?; self.transports.fmt_metrics(f)?; self.tls_config_reload.fmt_metrics(f)?; self.process.fmt_metrics(f)?; @@ -114,60 +89,3 @@ impl FmtMetrics for Root { Ok(()) } } - - -#[cfg(test)] -mod tests { - use ctx; - use ctx::test_util::*; - use super::*; - use conditional::Conditional; - use tls; - - const TLS_DISABLED: Conditional<(), tls::ReasonForNoTls> = - Conditional::None(tls::ReasonForNoTls::Disabled); - - fn mock_route( - root: &mut Root, - proxy: ctx::Proxy, - server: &Arc, - team: &str - ) { - let client = client(proxy, indexmap!["team".into() => team.into(),], TLS_DISABLED); - let (req, rsp) = request("http://nba.com", &server, &client); - root.request(http::RequestLabels::new(&req)).end(); - root.response(http::ResponseLabels::new(&rsp, None)).end(Duration::from_millis(10)); - } - - #[test] - fn expiry() { - let proxy = ctx::Proxy::Outbound; - - let server = server(proxy, TLS_DISABLED); - - let mut root = Root::default(); - - let t0 = Instant::now(); - - mock_route(&mut root, proxy, &server, "warriors"); - let t1 = Instant::now(); - - mock_route(&mut root, proxy, &server, "sixers"); - let t2 = Instant::now(); - - assert_eq!(root.requests.len(), 2); - assert_eq!(root.responses.len(), 2); - - root.retain_since(t0); - assert_eq!(root.requests.len(), 2); - assert_eq!(root.responses.len(), 2); - - root.retain_since(t1); - assert_eq!(root.requests.len(), 1); - assert_eq!(root.responses.len(), 1); - - root.retain_since(t2); - assert_eq!(root.requests.len(), 0); - assert_eq!(root.responses.len(), 0); - } -} diff --git a/src/telemetry/metrics/serve.rs b/src/telemetry/metrics/serve.rs index 89291eedf..51b7d583f 100644 --- a/src/telemetry/metrics/serve.rs +++ b/src/telemetry/metrics/serve.rs @@ -35,7 +35,7 @@ enum ServeError { // ===== impl Serve ===== impl Serve { - pub(super) fn new(metrics: &Arc>, idle_retain: Duration) -> Self { + pub fn new(metrics: &Arc>, idle_retain: Duration) -> Self { Serve { metrics: metrics.clone(), idle_retain, diff --git a/src/telemetry/mod.rs b/src/telemetry/mod.rs index 2643037ee..ee0ca5fb1 100644 --- a/src/telemetry/mod.rs +++ b/src/telemetry/mod.rs @@ -34,15 +34,16 @@ pub fn new( taps: &Arc>, ) -> (Sensors, transport::Registry, tls_config_reload::Sensor, ServeMetrics) { let process = process::Report::new(start_time); + let (http_sensors, http_report) = http::new(taps); let (transport_registry, transport_report) = transport::new(); let (tls_config_sensor, tls_config_fmt) = tls_config_reload::new(); - let (record, serve) = metrics::new( - metrics_retain_idle, - process, + let report = Arc::new(Mutex::new(metrics::Root::new( + http_report, transport_report, - tls_config_fmt - ); - let s = Sensors::new(record, taps); - (s, transport_registry, tls_config_sensor, serve) + tls_config_fmt, + process, + ))); + let serve = ServeMetrics::new(&report, metrics_retain_idle); + (http_sensors, transport_registry, tls_config_sensor, serve) }