diff --git a/proxy/src/telemetry/metrics/counter.rs b/proxy/src/telemetry/metrics/counter.rs index 808642831..1b473ff8e 100644 --- a/proxy/src/telemetry/metrics/counter.rs +++ b/proxy/src/telemetry/metrics/counter.rs @@ -63,6 +63,8 @@ impl ops::AddAssign for Counter { } impl FmtMetric for Counter { + const KIND: &'static str = "counter"; + fn fmt_metric(&self, f: &mut fmt::Formatter, name: N) -> fmt::Result { writeln!(f, "{} {}", name, self.0) } diff --git a/proxy/src/telemetry/metrics/gauge.rs b/proxy/src/telemetry/metrics/gauge.rs index 768db2ebe..064b2cec9 100644 --- a/proxy/src/telemetry/metrics/gauge.rs +++ b/proxy/src/telemetry/metrics/gauge.rs @@ -39,6 +39,8 @@ impl Into for Gauge { } impl FmtMetric for Gauge { + const KIND: &'static str = "gauge"; + fn fmt_metric(&self, f: &mut fmt::Formatter, name: N) -> fmt::Result { writeln!(f, "{} {}", name, self.0) } diff --git a/proxy/src/telemetry/metrics/histogram.rs b/proxy/src/telemetry/metrics/histogram.rs index ebe6aedf8..9e0d65975 100644 --- a/proxy/src/telemetry/metrics/histogram.rs +++ b/proxy/src/telemetry/metrics/histogram.rs @@ -103,6 +103,8 @@ impl<'a, V: Into> IntoIterator for &'a Histogram { } impl> FmtMetric for Histogram { + const KIND: &'static str = "histogram"; + fn fmt_metric(&self, f: &mut fmt::Formatter, name: N) -> fmt::Result { let mut total = Counter::default(); for (le, count) in self { diff --git a/proxy/src/telemetry/metrics/http.rs b/proxy/src/telemetry/metrics/http.rs new file mode 100644 index 000000000..7b6859d26 --- /dev/null +++ b/proxy/src/telemetry/metrics/http.rs @@ -0,0 +1,93 @@ +use std::fmt; +use std::time::Duration; + +use super::{ + latency, + Counter, + Histogram, + Metric, + RequestLabels, + ResponseLabels, + Scopes +}; + +pub(super) type RequestScopes = Scopes; + +#[derive(Debug, Default)] +pub(super) struct RequestMetrics { + total: Counter, +} + +pub(super) type ResponseScopes = Scopes; + +#[derive(Debug, Default)] +pub struct ResponseMetrics { + total: Counter, + latency: Histogram, +} + +// ===== impl RequestScopes ===== + +impl RequestScopes { + metrics! { + request_total: Counter { "Total count of HTTP requests." } + } +} + +impl fmt::Display for RequestScopes { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + if self.scopes.is_empty() { + return Ok(()); + } + + Self::request_total.fmt_help(f)?; + Self::request_total.fmt_scopes(f, &self, |s| &s.total)?; + + Ok(()) + } +} + +// ===== impl RequestMetrics ===== + +impl RequestMetrics { + pub fn end(&mut self) { + self.total.incr(); + } +} + +// ===== impl ResponseScopes ===== + +impl ResponseScopes { + metrics! { + response_total: Counter { "Total count of HTTP responses" }, + response_latency_ms: Histogram { + "Elapsed times between a request's headers being received \ + and its response stream completing" + } + } +} + +impl fmt::Display for ResponseScopes { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + if self.scopes.is_empty() { + return Ok(()); + } + + Self::response_total.fmt_help(f)?; + Self::response_total.fmt_scopes(f, &self, |s| &s.total)?; + + Self::response_latency_ms.fmt_help(f)?; + Self::response_latency_ms.fmt_scopes(f, &self, |s| &s.latency)?; + + Ok(()) + } +} + +// ===== impl ResponseMetrics ===== + +impl ResponseMetrics { + pub fn end(&mut self, duration: Duration) { + self.total.incr(); + self.latency.add(duration); + } +} diff --git a/proxy/src/telemetry/metrics/mod.rs b/proxy/src/telemetry/metrics/mod.rs index 69479b0ae..e8074e842 100644 --- a/proxy/src/telemetry/metrics/mod.rs +++ b/proxy/src/telemetry/metrics/mod.rs @@ -29,20 +29,36 @@ use std::default::Default; use std::fmt::{self, Display}; use std::hash::Hash; +use std::marker::PhantomData; use std::sync::{Arc, Mutex}; -use std::time; +use std::time::UNIX_EPOCH; use indexmap::IndexMap; use ctx; +macro_rules! metrics { + { $( $name:ident : $kind:ty { $help:expr } ),+ } => { + $( + #[allow(non_upper_case_globals)] + const $name: Metric<'static, $kind> = Metric { + name: stringify!($name), + help: $help, + _p: ::std::marker::PhantomData, + }; + )+ + } +} + mod counter; mod gauge; mod histogram; +mod http; mod labels; mod latency; mod record; mod serve; +mod transport; use self::counter::Counter; use self::gauge::Gauge; @@ -63,6 +79,9 @@ pub use self::serve::Serve; /// differences in formatting each type of metric. Specifically, `Histogram` formats a /// counter for each bucket, as well as a count and total sum. trait FmtMetric { + /// The metric's `TYPE` in help messages. + const KIND: &'static str; + /// Writes a metric with the given name and no labels. fn fmt_metric(&self, f: &mut fmt::Formatter, name: N) -> fmt::Result; @@ -73,35 +92,32 @@ trait FmtMetric { L: Display; } -#[derive(Debug, Clone)] -struct Metrics { - request_total: Metric, - - response_total: Metric, - response_latency: Metric, ResponseLabels>, - - tcp: TcpMetrics, - - start_time: u64, +/// Describes a metric statically. +/// +/// Formats help messages and metric values for prometheus output. +struct Metric<'a, M: FmtMetric> { + name: &'a str, + help: &'a str, + _p: PhantomData, } -#[derive(Debug, Clone)] -struct TcpMetrics { - open_total: Metric, - close_total: Metric, +/// The root scope for all runtime metrics. +#[derive(Debug, Default)] +struct Root { + requests: http::RequestScopes, + responses: http::ResponseScopes, + transports: transport::OpenScopes, + transport_closes: transport::CloseScopes, - connection_duration: Metric, TransportCloseLabels>, - open_connections: Metric, - - write_bytes_total: Metric, - read_bytes_total: Metric, + start_time: Gauge, } -#[derive(Debug, Clone)] -struct Metric { - name: &'static str, - help: &'static str, - values: IndexMap +/// Holds an `S`-typed scope for each `L`-typed label set. +/// +/// An `S` type typically holds one or more metrics. +#[derive(Debug)] +struct Scopes { + scopes: IndexMap, } /// Construct the Prometheus metrics. @@ -111,255 +127,100 @@ struct Metric { /// scrape endpoint, while the `Record` side can receive updates to the /// metrics by calling `record_event`. pub fn new(process: &Arc) -> (Record, Serve){ - let metrics = Arc::new(Mutex::new(Metrics::new(process))); + let metrics = Arc::new(Mutex::new(Root::new(process))); (Record::new(&metrics), Serve::new(&metrics)) } -// ===== impl Metrics ===== - -impl Metrics { - - pub fn new(process: &Arc) -> Self { - - let start_time = process.start_time - .duration_since(time::UNIX_EPOCH) - .expect( - "process start time should not be before the beginning \ - of the Unix epoch" - ) - .as_secs(); - - let request_total = Metric::::new( - "request_total", - "A counter of the number of requests the proxy has received.", - ); - - let response_total = Metric::::new( - "response_total", - "A counter of the number of responses the proxy has received.", - ); - - let response_latency = Metric::, ResponseLabels>::new( - "response_latency_ms", - "A histogram of the total latency of a response. This is measured \ - from when the request headers are received to when the response \ - stream has completed.", - ); - - Metrics { - request_total, - response_total, - response_latency, - tcp: TcpMetrics::new(), - start_time, - } - } - - fn request_total(&mut self, - labels: RequestLabels) - -> &mut Counter { - self.request_total.values - .entry(labels) - .or_insert_with(Counter::default) - } - - fn response_latency(&mut self, - labels: ResponseLabels) - -> &mut Histogram { - self.response_latency.values - .entry(labels) - .or_insert_with(Histogram::default) - } - - fn response_total(&mut self, - labels: ResponseLabels) - -> &mut Counter { - self.response_total.values - .entry(labels) - .or_insert_with(Counter::default) - } - - fn tcp(&mut self) -> &mut TcpMetrics { - &mut self.tcp - } -} - -impl fmt::Display for Metrics { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - writeln!(f, "{}", self.request_total)?; - writeln!(f, "{}", self.response_total)?; - writeln!(f, "{}", self.response_latency)?; - writeln!(f, "{}", self.tcp)?; - - writeln!(f, "process_start_time_seconds {}", self.start_time)?; - Ok(()) - } -} - -// ===== impl TcpMetrics ===== - -impl TcpMetrics { - pub fn new() -> TcpMetrics { - let open_total = Metric::::new( - "tcp_open_total", - "A counter of the total number of transport connections.", - ); - - let close_total = Metric::::new( - "tcp_close_total", - "A counter of the total number of transport connections.", - ); - - let connection_duration = Metric::, TransportCloseLabels>::new( - "tcp_connection_duration_ms", - "A histogram of the duration of the lifetime of connections, in milliseconds", - ); - - let open_connections = Metric::::new( - "tcp_open_connections", - "A gauge of the number of transport connections currently open.", - ); - - let read_bytes_total = Metric::::new( - "tcp_read_bytes_total", - "A counter of the total number of recieved bytes." - ); - - let write_bytes_total = Metric::::new( - "tcp_write_bytes_total", - "A counter of the total number of sent bytes." - ); - - Self { - open_total, - close_total, - connection_duration, - open_connections, - read_bytes_total, - write_bytes_total, - } - } - - fn open_total(&mut self, labels: TransportLabels) -> &mut Counter { - self.open_total.values - .entry(labels) - .or_insert_with(Default::default) - } - - fn close_total(&mut self, labels: TransportCloseLabels) -> &mut Counter { - self.close_total.values - .entry(labels) - .or_insert_with(Counter::default) - } - - fn connection_duration(&mut self, labels: TransportCloseLabels) -> &mut Histogram { - self.connection_duration.values - .entry(labels) - .or_insert_with(Histogram::default) - } - - fn open_connections(&mut self, labels: TransportLabels) -> &mut Gauge { - self.open_connections.values - .entry(labels) - .or_insert_with(Gauge::default) - } - - fn write_bytes_total(&mut self, labels: TransportLabels) -> &mut Counter { - self.write_bytes_total.values - .entry(labels) - .or_insert_with(Counter::default) - } - - fn read_bytes_total(&mut self, labels: TransportLabels) -> &mut Counter { - self.read_bytes_total.values - .entry(labels) - .or_insert_with(Counter::default) - } -} - -impl fmt::Display for TcpMetrics { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - writeln!(f, "{}", self.open_total)?; - writeln!(f, "{}", self.close_total)?; - writeln!(f, "{}", self.connection_duration)?; - writeln!(f, "{}", self.open_connections)?; - writeln!(f, "{}", self.write_bytes_total)?; - writeln!(f, "{}", self.read_bytes_total)?; - - Ok(()) - } -} - // ===== impl Metric ===== -impl Metric { - - pub fn new(name: &'static str, help: &'static str) -> Self { - Metric { - name, - help, - values: IndexMap::new(), - } +impl<'a, M: FmtMetric> Metric<'a, M> { + /// Formats help messages for this metric. + pub fn fmt_help(&self, f: &mut fmt::Formatter) -> fmt::Result { + writeln!(f, "# HELP {} {}", self.name, self.help)?; + writeln!(f, "# TYPE {} {}", self.name, M::KIND)?; + Ok(()) } -} + /// Formats a single metric without labels. + pub fn fmt_metric(&self, f: &mut fmt::Formatter, metric: M) -> fmt::Result { + metric.fmt_metric(f, self.name) + } -impl fmt::Display for Metric -where - L: fmt::Display, - L: Hash + Eq, -{ - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, - "# HELP {name} {help}\n# TYPE {name} counter\n", - name = self.name, - help = self.help, - )?; - - for (labels, value) in &self.values { - value.fmt_metric_labeled(f, self.name, labels)?; + /// Formats a single metric across labeled scopes. + pub fn fmt_scopes &M>( + &self, + f: &mut fmt::Formatter, + scopes: &Scopes, + to_metric: F + )-> fmt::Result { + for (labels, scope) in &scopes.scopes { + to_metric(scope).fmt_metric_labeled(f, self.name, labels)?; } Ok(()) } } -impl fmt::Display for Metric -where - L: fmt::Display, - L: Hash + Eq, -{ - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, - "# HELP {name} {help}\n# TYPE {name} gauge\n", - name = self.name, - help = self.help, - )?; +// ===== impl Root ===== - for (labels, value) in &self.values { - value.fmt_metric_labeled(f, self.name, labels)?; +impl Root { + metrics! { + process_start_time_seconds: Gauge { + "Time that the process started (in seconds since the UNIX epoch)" } + } + + pub fn new(process: &Arc) -> Self { + let t0 = process.start_time + .duration_since(UNIX_EPOCH) + .expect("process start time") + .as_secs(); + + Self { + start_time: t0.into(), + .. Root::default() + } + } + + fn request(&mut self, labels: RequestLabels) -> &mut http::RequestMetrics { + self.requests.scopes.entry(labels) + .or_insert_with(http::RequestMetrics::default) + } + + fn response(&mut self, labels: ResponseLabels) -> &mut http::ResponseMetrics { + self.responses.scopes.entry(labels) + .or_insert_with(http::ResponseMetrics::default) + } + + fn transport(&mut self, labels: TransportLabels) -> &mut transport::OpenMetrics { + self.transports.scopes.entry(labels) + .or_insert_with(transport::OpenMetrics::default) + } + + fn transport_close(&mut self, labels: TransportCloseLabels) -> &mut transport::CloseMetrics { + self.transport_closes.scopes.entry(labels) + .or_insert_with(transport::CloseMetrics::default) + } +} + +impl fmt::Display for Root { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.requests.fmt(f)?; + self.responses.fmt(f)?; + self.transports.fmt(f)?; + self.transport_closes.fmt(f)?; + + Self::process_start_time_seconds.fmt_help(f)?; + Self::process_start_time_seconds.fmt_metric(f, self.start_time)?; Ok(()) } } -impl fmt::Display for Metric, L> where - L: fmt::Display, - L: Hash + Eq, - V: Into, -{ - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, - "# HELP {name} {help}\n# TYPE {name} histogram\n", - name = self.name, - help = self.help, - )?; +// ===== impl Scopes ===== - for (labels, histogram) in &self.values { - histogram.fmt_metric_labeled(f, self.name, labels)?; - } - - Ok(()) +impl Default for Scopes { + fn default() -> Self { + Scopes { scopes: IndexMap::default(), } } } diff --git a/proxy/src/telemetry/metrics/record.rs b/proxy/src/telemetry/metrics/record.rs index 0631999b2..3ea855012 100644 --- a/proxy/src/telemetry/metrics/record.rs +++ b/proxy/src/telemetry/metrics/record.rs @@ -1,7 +1,7 @@ use std::sync::{Arc, Mutex}; use telemetry::event::Event; -use super::Metrics; +use super::Root; use super::labels::{ RequestLabels, ResponseLabels, @@ -12,18 +12,18 @@ use super::labels::{ /// Tracks Prometheus metrics #[derive(Debug)] pub struct Record { - metrics: Arc>, + metrics: Arc>, } // ===== impl Record ===== impl Record { - pub(super) fn new(metrics: &Arc>) -> Self { + pub(super) fn new(metrics: &Arc>) -> Self { Self { metrics: metrics.clone() } } #[inline] - fn update(&mut self, f: F) { + fn update(&mut self, f: F) { let mut lock = self.metrics.lock() .expect("metrics lock poisoned"); f(&mut *lock); @@ -31,71 +31,52 @@ impl Record { /// Observe the given event. pub fn record_event(&mut self, event: &Event) { - trace!("Metrics::record({:?})", event); + trace!("Root::record({:?})", event); match *event { - Event::StreamRequestOpen(_) | Event::StreamResponseOpen(_, _) => { - // Do nothing; we'll record metrics for the request or response - // when the stream *finishes*. - }, + Event::StreamRequestOpen(_) => {}, Event::StreamRequestFail(ref req, _) => { self.update(|metrics| { - metrics.request_total(RequestLabels::new(req)).incr(); + metrics.request(RequestLabels::new(req)).end(); }) }, Event::StreamRequestEnd(ref req, _) => { self.update(|metrics| { - metrics.request_total(RequestLabels::new(req)).incr(); + metrics.request(RequestLabels::new(req)).end(); }) }, + Event::StreamResponseOpen(_, _) => {}, + Event::StreamResponseEnd(ref res, ref end) => { self.update(|metrics| { - let labels = ResponseLabels::new(res, end.grpc_status); - metrics.response_total(labels.clone()).incr(); - metrics.response_latency(labels).add(end.since_request_open); + metrics.response(ResponseLabels::new(res, end.grpc_status)) + .end(end.since_request_open); }); }, Event::StreamResponseFail(ref res, ref fail) => { + // TODO: do we care about the failure's error code here? self.update(|metrics| { - // TODO: do we care about the failure's error code here? - let labels = ResponseLabels::fail(res); - metrics.response_total(labels.clone()).incr(); - metrics.response_latency(labels).add(fail.since_request_open); + metrics.response(ResponseLabels::fail(res)).end(fail.since_request_open) }); }, Event::TransportOpen(ref ctx) => { - let labels = TransportLabels::new(ctx); self.update(|metrics| { - metrics.tcp().open_total(labels).incr(); - metrics.tcp().open_connections(labels).incr(); + metrics.transport(TransportLabels::new(ctx)).open(); }) }, Event::TransportClose(ref ctx, ref close) => { - let labels = TransportLabels::new(ctx); - let close_labels = TransportCloseLabels::new(ctx, close); self.update(|metrics| { - *metrics.tcp().write_bytes_total(labels) += close.tx_bytes as u64; - *metrics.tcp().read_bytes_total(labels) += close.rx_bytes as u64; + metrics.transport(TransportLabels::new(ctx)) + .close(close.rx_bytes, close.tx_bytes); - metrics.tcp().connection_duration(close_labels).add(close.duration); - metrics.tcp().close_total(close_labels).incr(); - - let metrics = metrics.tcp().open_connections.values.get_mut(&labels); - debug_assert!(metrics.is_some()); - match metrics { - Some(m) => { - m.decr(); - } - None => { - error!("Closed transport missing from metrics registry: {{{}}}", labels); - } - } + metrics.transport_close(TransportCloseLabels::new(ctx, close)) + .close(close.duration); }) }, }; diff --git a/proxy/src/telemetry/metrics/serve.rs b/proxy/src/telemetry/metrics/serve.rs index bae2d97c2..af940e47c 100644 --- a/proxy/src/telemetry/metrics/serve.rs +++ b/proxy/src/telemetry/metrics/serve.rs @@ -7,18 +7,18 @@ use hyper::server::{Request, Response, Service}; use std::io::Write; use std::sync::{Arc, Mutex}; -use super::Metrics; +use super::Root; /// Serve Prometheues metrics. #[derive(Debug, Clone)] pub struct Serve { - metrics: Arc>, + metrics: Arc>, } // ===== impl Serve ===== impl Serve { - pub(super) fn new(metrics: &Arc>) -> Self { + pub(super) fn new(metrics: &Arc>) -> Self { Serve { metrics: metrics.clone(), } diff --git a/proxy/src/telemetry/metrics/transport.rs b/proxy/src/telemetry/metrics/transport.rs new file mode 100644 index 000000000..f8dfb9cd2 --- /dev/null +++ b/proxy/src/telemetry/metrics/transport.rs @@ -0,0 +1,113 @@ +use std::fmt; +use std::time::Duration; + +use super::{ + latency, + Counter, + Gauge, + Histogram, + Metric, + TransportLabels, + TransportCloseLabels, + Scopes +}; + +pub(super) type OpenScopes = Scopes; + +#[derive(Debug, Default)] +pub(super) struct OpenMetrics { + open_total: Counter, + open_connections: Gauge, + write_bytes_total: Counter, + read_bytes_total: Counter, +} + +pub(super) type CloseScopes = Scopes; + +#[derive(Debug, Default)] +pub(super) struct CloseMetrics { + close_total: Counter, + connection_duration: Histogram, +} + +// ===== impl OpenScopes ===== + +impl OpenScopes { + metrics! { + tcp_open_total: Counter { "Total count of opened connections" }, + tcp_open_connections: Gauge { "Number of currently-open connections" }, + tcp_read_bytes_total: Counter { "Total count of bytes read from peers" }, + tcp_write_bytes_total: Counter { "Total count of bytes written to peers" } + } +} + +impl fmt::Display for OpenScopes { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + if self.scopes.is_empty() { + return Ok(()); + } + + Self::tcp_open_total.fmt_help(f)?; + Self::tcp_open_total.fmt_scopes(f, &self, |s| &s.open_total)?; + + Self::tcp_open_connections.fmt_help(f)?; + Self::tcp_open_connections.fmt_scopes(f, &self, |s| &s.open_connections)?; + + Self::tcp_read_bytes_total.fmt_help(f)?; + Self::tcp_read_bytes_total.fmt_scopes(f, &self, |s| &s.read_bytes_total)?; + + Self::tcp_write_bytes_total.fmt_help(f)?; + Self::tcp_write_bytes_total.fmt_scopes(f, &self, |s| &s.write_bytes_total)?; + + Ok(()) + } +} + +// ===== impl OpenMetrics ===== + +impl OpenMetrics { + pub(super) fn open(&mut self) { + self.open_total.incr(); + self.open_connections.incr(); + } + + pub(super) fn close(&mut self, rx: u64, tx: u64) { + self.open_connections.decr(); + self.read_bytes_total += rx; + self.write_bytes_total += tx; + } +} + +// ===== impl CloseScopes ===== + +impl CloseScopes { + metrics! { + tcp_close_total: Counter { "Total count of closed connections" }, + tcp_connection_duration_ms: Histogram { "Connection lifetimes" } + } +} + +impl fmt::Display for CloseScopes { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + if self.scopes.is_empty() { + return Ok(()); + } + + Self::tcp_close_total.fmt_help(f)?; + Self::tcp_close_total.fmt_scopes(f, &self, |s| &s.close_total)?; + + Self::tcp_connection_duration_ms.fmt_help(f)?; + Self::tcp_connection_duration_ms.fmt_scopes(f, &self, |s| &s.connection_duration)?; + + Ok(()) + } +} + +// ===== impl CloseMetrics ===== + +impl CloseMetrics { + pub(super) fn close(&mut self, duration: Duration) { + self.close_total.incr(); + self.connection_duration.add(duration); + } +}