From 64a3bb09b25ac9cf88452ec8945ad520757e8c6f Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Sat, 28 Apr 2018 15:35:29 -0700 Subject: [PATCH] Rename `metrics::Aggregate` to `metrics::Record` (#875) Move `Record` into its own file. --- proxy/src/telemetry/control.rs | 10 +-- proxy/src/telemetry/metrics/mod.rs | 117 ++------------------------ proxy/src/telemetry/metrics/record.rs | 108 ++++++++++++++++++++++++ 3 files changed, 121 insertions(+), 114 deletions(-) create mode 100644 proxy/src/telemetry/metrics/record.rs diff --git a/proxy/src/telemetry/control.rs b/proxy/src/telemetry/control.rs index f879bba87..97f125b11 100644 --- a/proxy/src/telemetry/control.rs +++ b/proxy/src/telemetry/control.rs @@ -33,8 +33,8 @@ pub struct MakeControl { /// Limit the amount of memory that may be consumed for metrics aggregation. #[derive(Debug)] pub struct Control { - /// Aggregates scrapable metrics. - metrics_aggregate: metrics::Aggregate, + /// Records telemetry events. + metrics_record: metrics::Record, /// Serves scrapable metrics. metrics_service: metrics::Serve, @@ -76,11 +76,11 @@ impl MakeControl { /// - `Ok(())` if the timeout was successfully created. /// - `Err(io::Error)` if the timeout could not be created. pub fn make_control(self, taps: &Arc>, handle: &Handle) -> io::Result { - let (metrics_aggregate, metrics_service) = + let (metrics_record, metrics_service) = metrics::new(&self.process_ctx); Ok(Control { - metrics_aggregate, + metrics_record, metrics_service, rx: Some(self.rx), taps: Some(taps.clone()), @@ -146,7 +146,7 @@ impl Future for Control { } } - self.metrics_aggregate.record_event(&ev); + self.metrics_record.record_event(&ev); } None => { debug!("events finished"); diff --git a/proxy/src/telemetry/metrics/mod.rs b/proxy/src/telemetry/metrics/mod.rs index effe59b87..1b48d3e65 100644 --- a/proxy/src/telemetry/metrics/mod.rs +++ b/proxy/src/telemetry/metrics/mod.rs @@ -1,4 +1,4 @@ -//! Aggregates and serves Prometheus metrics. +//! Records and serves Prometheus metrics. //! //! # A note on label formatting //! @@ -42,16 +42,16 @@ use hyper::server::{ Request as HyperRequest, Service as HyperService, }; -use indexmap::{IndexMap}; +use indexmap::IndexMap; use ctx; -use telemetry::event::Event; mod counter; mod gauge; mod histogram; mod labels; mod latency; +mod record; use self::counter::Counter; use self::gauge::Gauge; @@ -63,6 +63,7 @@ use self::labels::{ TransportCloseLabels }; pub use self::labels::DstLabels; +pub use self::record::Record; #[derive(Debug, Clone)] struct Metrics { @@ -95,12 +96,6 @@ struct Metric { values: IndexMap } -/// Tracks Prometheus metrics -#[derive(Debug)] -pub struct Aggregate { - metrics: Arc>, -} - /// Serve Prometheues metrics. #[derive(Debug, Clone)] pub struct Serve { @@ -109,13 +104,13 @@ pub struct Serve { /// Construct the Prometheus metrics. /// -/// Returns the `Aggregate` and `Serve` sides. The `Serve` side +/// Returns the `Record` and `Serve` sides. The `Serve` side /// is a Hyper service which can be used to create the server for the -/// scrape endpoint, while the `Aggregate` side can receive updates to the +/// scrape endpoint, while the `Record` side can receive updates to the /// metrics by calling `record_event`. -pub fn new(process: &Arc) -> (Aggregate, Serve){ +pub fn new(process: &Arc) -> (Record, Serve){ let metrics = Arc::new(Mutex::new(Metrics::new(process))); - (Aggregate::new(&metrics), Serve::new(&metrics)) + (Record::new(&metrics), Serve::new(&metrics)) } // ===== impl Metrics ===== @@ -399,102 +394,6 @@ impl fmt::Display for Metric, L> where } } -// ===== impl Aggregate ===== - -impl Aggregate { - - fn new(metrics: &Arc>) -> Self { - Aggregate { - metrics: metrics.clone(), - } - } - - #[inline] - fn update(&mut self, f: F) { - let mut lock = self.metrics.lock() - .expect("metrics lock poisoned"); - f(&mut *lock); - } - - /// Observe the given event. - pub fn record_event(&mut self, event: &Event) { - trace!("Metrics::record({:?})", event); - match *event { - - Event::StreamRequestOpen(_) | Event::StreamResponseOpen(_, _) => { - // Do nothing; we'll record metrics for the request or response - // when the stream *finishes*. - }, - - Event::StreamRequestFail(ref req, _) => { - let labels = Arc::new(RequestLabels::new(req)); - self.update(|metrics| { - metrics.request_total(&labels).incr(); - }) - }, - - Event::StreamRequestEnd(ref req, _) => { - let labels = Arc::new(RequestLabels::new(req)); - self.update(|metrics| { - metrics.request_total(&labels).incr(); - }) - }, - - Event::StreamResponseEnd(ref res, ref end) => { - let labels = Arc::new(ResponseLabels::new( - res, - end.grpc_status, - )); - self.update(|metrics| { - metrics.response_total(&labels).incr(); - metrics.response_latency(&labels).add(end.since_request_open); - }); - }, - - Event::StreamResponseFail(ref res, ref fail) => { - // TODO: do we care about the failure's error code here? - let labels = Arc::new(ResponseLabels::fail(res)); - self.update(|metrics| { - metrics.response_total(&labels).incr(); - metrics.response_latency(&labels).add(fail.since_request_open); - }); - }, - - Event::TransportOpen(ref ctx) => { - let labels = Arc::new(TransportLabels::new(ctx)); - self.update(|metrics| { - metrics.tcp().open_total(&labels).incr(); - metrics.tcp().open_connections(&labels).incr(); - }) - }, - - Event::TransportClose(ref ctx, ref close) => { - let labels = Arc::new(TransportLabels::new(ctx)); - let close_labels = Arc::new(TransportCloseLabels::new(ctx, close)); - self.update(|metrics| { - *metrics.tcp().write_bytes_total(&labels) += close.tx_bytes as u64; - *metrics.tcp().read_bytes_total(&labels) += close.rx_bytes as u64; - - metrics.tcp().connection_duration(&close_labels).add(close.duration); - metrics.tcp().close_total(&close_labels).incr(); - - let metrics = metrics.tcp().open_connections.values.get_mut(&labels); - debug_assert!(metrics.is_some()); - match metrics { - Some(m) => { - m.decr(); - } - None => { - error!("Closed transport missing from metrics registry: {{{}}}", labels); - } - } - }) - }, - }; - } -} - - // ===== impl Serve ===== impl Serve { diff --git a/proxy/src/telemetry/metrics/record.rs b/proxy/src/telemetry/metrics/record.rs new file mode 100644 index 000000000..3d9d4e46a --- /dev/null +++ b/proxy/src/telemetry/metrics/record.rs @@ -0,0 +1,108 @@ +use std::sync::{Arc, Mutex}; + +use telemetry::event::Event; +use super::Metrics; +use super::labels::{ + RequestLabels, + ResponseLabels, + TransportLabels, + TransportCloseLabels +}; + +/// Tracks Prometheus metrics +#[derive(Debug)] +pub struct Record { + metrics: Arc>, +} + +// ===== impl Record ===== + +impl Record { + pub(super) fn new(metrics: &Arc>) -> Self { + Self { metrics: metrics.clone() } + } + + #[inline] + fn update(&mut self, f: F) { + let mut lock = self.metrics.lock() + .expect("metrics lock poisoned"); + f(&mut *lock); + } + + /// Observe the given event. + pub fn record_event(&mut self, event: &Event) { + trace!("Metrics::record({:?})", event); + match *event { + + Event::StreamRequestOpen(_) | Event::StreamResponseOpen(_, _) => { + // Do nothing; we'll record metrics for the request or response + // when the stream *finishes*. + }, + + Event::StreamRequestFail(ref req, _) => { + let labels = Arc::new(RequestLabels::new(req)); + self.update(|metrics| { + metrics.request_total(&labels).incr(); + }) + }, + + Event::StreamRequestEnd(ref req, _) => { + let labels = Arc::new(RequestLabels::new(req)); + self.update(|metrics| { + metrics.request_total(&labels).incr(); + }) + }, + + Event::StreamResponseEnd(ref res, ref end) => { + let labels = Arc::new(ResponseLabels::new( + res, + end.grpc_status, + )); + self.update(|metrics| { + metrics.response_total(&labels).incr(); + metrics.response_latency(&labels).add(end.since_request_open); + }); + }, + + Event::StreamResponseFail(ref res, ref fail) => { + // TODO: do we care about the failure's error code here? + let labels = Arc::new(ResponseLabels::fail(res)); + self.update(|metrics| { + metrics.response_total(&labels).incr(); + metrics.response_latency(&labels).add(fail.since_request_open); + }); + }, + + Event::TransportOpen(ref ctx) => { + let labels = Arc::new(TransportLabels::new(ctx)); + self.update(|metrics| { + metrics.tcp().open_total(&labels).incr(); + metrics.tcp().open_connections(&labels).incr(); + }) + }, + + Event::TransportClose(ref ctx, ref close) => { + let labels = Arc::new(TransportLabels::new(ctx)); + let close_labels = Arc::new(TransportCloseLabels::new(ctx, close)); + self.update(|metrics| { + *metrics.tcp().write_bytes_total(&labels) += close.tx_bytes as u64; + *metrics.tcp().read_bytes_total(&labels) += close.rx_bytes as u64; + + metrics.tcp().connection_duration(&close_labels).add(close.duration); + metrics.tcp().close_total(&close_labels).incr(); + + let metrics = metrics.tcp().open_connections.values.get_mut(&labels); + debug_assert!(metrics.is_some()); + match metrics { + Some(m) => { + m.decr(); + } + None => { + error!("Closed transport missing from metrics registry: {{{}}}", labels); + } + } + }) + }, + }; + } +}