Rename `metrics::Aggregate` to `metrics::Record` (#875)

Move `Record` into its own file.
This commit is contained in:
Oliver Gould 2018-04-28 15:35:29 -07:00 committed by GitHub
parent 9c70310406
commit 64a3bb09b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 121 additions and 114 deletions

View File

@ -33,8 +33,8 @@ pub struct MakeControl {
/// Limit the amount of memory that may be consumed for metrics aggregation. /// Limit the amount of memory that may be consumed for metrics aggregation.
#[derive(Debug)] #[derive(Debug)]
pub struct Control { pub struct Control {
/// Aggregates scrapable metrics. /// Records telemetry events.
metrics_aggregate: metrics::Aggregate, metrics_record: metrics::Record,
/// Serves scrapable metrics. /// Serves scrapable metrics.
metrics_service: metrics::Serve, metrics_service: metrics::Serve,
@ -76,11 +76,11 @@ impl MakeControl {
/// - `Ok(())` if the timeout was successfully created. /// - `Ok(())` if the timeout was successfully created.
/// - `Err(io::Error)` if the timeout could not be created. /// - `Err(io::Error)` if the timeout could not be created.
pub fn make_control(self, taps: &Arc<Mutex<Taps>>, handle: &Handle) -> io::Result<Control> { pub fn make_control(self, taps: &Arc<Mutex<Taps>>, handle: &Handle) -> io::Result<Control> {
let (metrics_aggregate, metrics_service) = let (metrics_record, metrics_service) =
metrics::new(&self.process_ctx); metrics::new(&self.process_ctx);
Ok(Control { Ok(Control {
metrics_aggregate, metrics_record,
metrics_service, metrics_service,
rx: Some(self.rx), rx: Some(self.rx),
taps: Some(taps.clone()), taps: Some(taps.clone()),
@ -146,7 +146,7 @@ impl Future for Control {
} }
} }
self.metrics_aggregate.record_event(&ev); self.metrics_record.record_event(&ev);
} }
None => { None => {
debug!("events finished"); debug!("events finished");

View File

@ -1,4 +1,4 @@
//! Aggregates and serves Prometheus metrics. //! Records and serves Prometheus metrics.
//! //!
//! # A note on label formatting //! # A note on label formatting
//! //!
@ -42,16 +42,16 @@ use hyper::server::{
Request as HyperRequest, Request as HyperRequest,
Service as HyperService, Service as HyperService,
}; };
use indexmap::{IndexMap}; use indexmap::IndexMap;
use ctx; use ctx;
use telemetry::event::Event;
mod counter; mod counter;
mod gauge; mod gauge;
mod histogram; mod histogram;
mod labels; mod labels;
mod latency; mod latency;
mod record;
use self::counter::Counter; use self::counter::Counter;
use self::gauge::Gauge; use self::gauge::Gauge;
@ -63,6 +63,7 @@ use self::labels::{
TransportCloseLabels TransportCloseLabels
}; };
pub use self::labels::DstLabels; pub use self::labels::DstLabels;
pub use self::record::Record;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct Metrics { struct Metrics {
@ -95,12 +96,6 @@ struct Metric<M, L: Hash + Eq> {
values: IndexMap<L, M> values: IndexMap<L, M>
} }
/// Tracks Prometheus metrics
#[derive(Debug)]
pub struct Aggregate {
metrics: Arc<Mutex<Metrics>>,
}
/// Serve Prometheues metrics. /// Serve Prometheues metrics.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Serve { pub struct Serve {
@ -109,13 +104,13 @@ pub struct Serve {
/// Construct the Prometheus metrics. /// Construct the Prometheus metrics.
/// ///
/// Returns the `Aggregate` and `Serve` sides. The `Serve` side /// Returns the `Record` and `Serve` sides. The `Serve` side
/// is a Hyper service which can be used to create the server for the /// is a Hyper service which can be used to create the server for the
/// scrape endpoint, while the `Aggregate` side can receive updates to the /// scrape endpoint, while the `Record` side can receive updates to the
/// metrics by calling `record_event`. /// metrics by calling `record_event`.
pub fn new(process: &Arc<ctx::Process>) -> (Aggregate, Serve){ pub fn new(process: &Arc<ctx::Process>) -> (Record, Serve){
let metrics = Arc::new(Mutex::new(Metrics::new(process))); let metrics = Arc::new(Mutex::new(Metrics::new(process)));
(Aggregate::new(&metrics), Serve::new(&metrics)) (Record::new(&metrics), Serve::new(&metrics))
} }
// ===== impl Metrics ===== // ===== impl Metrics =====
@ -399,102 +394,6 @@ impl<L, V> fmt::Display for Metric<Histogram<V>, L> where
} }
} }
// ===== impl Aggregate =====
impl Aggregate {
fn new(metrics: &Arc<Mutex<Metrics>>) -> Self {
Aggregate {
metrics: metrics.clone(),
}
}
#[inline]
fn update<F: Fn(&mut Metrics)>(&mut self, f: F) {
let mut lock = self.metrics.lock()
.expect("metrics lock poisoned");
f(&mut *lock);
}
/// Observe the given event.
pub fn record_event(&mut self, event: &Event) {
trace!("Metrics::record({:?})", event);
match *event {
Event::StreamRequestOpen(_) | Event::StreamResponseOpen(_, _) => {
// Do nothing; we'll record metrics for the request or response
// when the stream *finishes*.
},
Event::StreamRequestFail(ref req, _) => {
let labels = Arc::new(RequestLabels::new(req));
self.update(|metrics| {
metrics.request_total(&labels).incr();
})
},
Event::StreamRequestEnd(ref req, _) => {
let labels = Arc::new(RequestLabels::new(req));
self.update(|metrics| {
metrics.request_total(&labels).incr();
})
},
Event::StreamResponseEnd(ref res, ref end) => {
let labels = Arc::new(ResponseLabels::new(
res,
end.grpc_status,
));
self.update(|metrics| {
metrics.response_total(&labels).incr();
metrics.response_latency(&labels).add(end.since_request_open);
});
},
Event::StreamResponseFail(ref res, ref fail) => {
// TODO: do we care about the failure's error code here?
let labels = Arc::new(ResponseLabels::fail(res));
self.update(|metrics| {
metrics.response_total(&labels).incr();
metrics.response_latency(&labels).add(fail.since_request_open);
});
},
Event::TransportOpen(ref ctx) => {
let labels = Arc::new(TransportLabels::new(ctx));
self.update(|metrics| {
metrics.tcp().open_total(&labels).incr();
metrics.tcp().open_connections(&labels).incr();
})
},
Event::TransportClose(ref ctx, ref close) => {
let labels = Arc::new(TransportLabels::new(ctx));
let close_labels = Arc::new(TransportCloseLabels::new(ctx, close));
self.update(|metrics| {
*metrics.tcp().write_bytes_total(&labels) += close.tx_bytes as u64;
*metrics.tcp().read_bytes_total(&labels) += close.rx_bytes as u64;
metrics.tcp().connection_duration(&close_labels).add(close.duration);
metrics.tcp().close_total(&close_labels).incr();
let metrics = metrics.tcp().open_connections.values.get_mut(&labels);
debug_assert!(metrics.is_some());
match metrics {
Some(m) => {
m.decr();
}
None => {
error!("Closed transport missing from metrics registry: {{{}}}", labels);
}
}
})
},
};
}
}
// ===== impl Serve ===== // ===== impl Serve =====
impl Serve { impl Serve {

View File

@ -0,0 +1,108 @@
use std::sync::{Arc, Mutex};
use telemetry::event::Event;
use super::Metrics;
use super::labels::{
RequestLabels,
ResponseLabels,
TransportLabels,
TransportCloseLabels
};
/// Tracks Prometheus metrics
#[derive(Debug)]
pub struct Record {
metrics: Arc<Mutex<Metrics>>,
}
// ===== impl Record =====
impl Record {
pub(super) fn new(metrics: &Arc<Mutex<Metrics>>) -> Self {
Self { metrics: metrics.clone() }
}
#[inline]
fn update<F: Fn(&mut Metrics)>(&mut self, f: F) {
let mut lock = self.metrics.lock()
.expect("metrics lock poisoned");
f(&mut *lock);
}
/// Observe the given event.
pub fn record_event(&mut self, event: &Event) {
trace!("Metrics::record({:?})", event);
match *event {
Event::StreamRequestOpen(_) | Event::StreamResponseOpen(_, _) => {
// Do nothing; we'll record metrics for the request or response
// when the stream *finishes*.
},
Event::StreamRequestFail(ref req, _) => {
let labels = Arc::new(RequestLabels::new(req));
self.update(|metrics| {
metrics.request_total(&labels).incr();
})
},
Event::StreamRequestEnd(ref req, _) => {
let labels = Arc::new(RequestLabels::new(req));
self.update(|metrics| {
metrics.request_total(&labels).incr();
})
},
Event::StreamResponseEnd(ref res, ref end) => {
let labels = Arc::new(ResponseLabels::new(
res,
end.grpc_status,
));
self.update(|metrics| {
metrics.response_total(&labels).incr();
metrics.response_latency(&labels).add(end.since_request_open);
});
},
Event::StreamResponseFail(ref res, ref fail) => {
// TODO: do we care about the failure's error code here?
let labels = Arc::new(ResponseLabels::fail(res));
self.update(|metrics| {
metrics.response_total(&labels).incr();
metrics.response_latency(&labels).add(fail.since_request_open);
});
},
Event::TransportOpen(ref ctx) => {
let labels = Arc::new(TransportLabels::new(ctx));
self.update(|metrics| {
metrics.tcp().open_total(&labels).incr();
metrics.tcp().open_connections(&labels).incr();
})
},
Event::TransportClose(ref ctx, ref close) => {
let labels = Arc::new(TransportLabels::new(ctx));
let close_labels = Arc::new(TransportCloseLabels::new(ctx, close));
self.update(|metrics| {
*metrics.tcp().write_bytes_total(&labels) += close.tx_bytes as u64;
*metrics.tcp().read_bytes_total(&labels) += close.rx_bytes as u64;
metrics.tcp().connection_duration(&close_labels).add(close.duration);
metrics.tcp().close_total(&close_labels).incr();
let metrics = metrics.tcp().open_connections.values.get_mut(&labels);
debug_assert!(metrics.is_some());
match metrics {
Some(m) => {
m.decr();
}
None => {
error!("Closed transport missing from metrics registry: {{{}}}", labels);
}
}
})
},
};
}
}