diff --git a/proxy/src/telemetry/metrics/counter.rs b/proxy/src/telemetry/metrics/counter.rs new file mode 100644 index 000000000..b9d086fce --- /dev/null +++ b/proxy/src/telemetry/metrics/counter.rs @@ -0,0 +1,60 @@ +use std::{fmt, ops}; +use std::num::Wrapping; + +/// A Prometheus counter is represented by a `Wrapping` unsigned 64-bit int. +/// +/// Counters always explicitly wrap on overflows rather than panicking in +/// debug builds. Prometheus' [`rate()`] and [`irate()`] queries handle breaks +/// in monotonicity gracefully (see also [`resets()`]), so wrapping is less +/// problematic than panicking in this case. +/// +/// Note, however, that Prometheus represents counters using 64-bit +/// floating-point numbers. The correct semantics are to ensure the counter +/// always gets reset to zero after Prometheus reads it, before it would ever +/// overflow a 52-bit `f64` mantissa. +/// +/// [`rate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#rate() +/// [`irate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#irate() +/// [`resets()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#resets +/// +// TODO: Implement Prometheus reset semantics correctly, taking into +// consideration that Prometheus models counters as `f64` and so +// there are only 52 significant bits. +#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)] +pub struct Counter(Wrapping); + +// ===== impl Counter ===== + +impl Counter { + /// Increment the counter by one. + /// + /// This function wraps on overflows. + pub fn incr(&mut self) { + (*self).0 += Wrapping(1); + } +} + +impl Into for Counter { + fn into(self) -> u64 { + (self.0).0 + } +} + +impl ops::Add for Counter { + type Output = Self; + fn add(self, Counter(rhs): Self) -> Self::Output { + Counter(self.0 + rhs) + } +} + +impl ops::AddAssign for Counter { + fn add_assign(&mut self, rhs: u64) { + (*self).0 += Wrapping(rhs) + } +} + +impl fmt::Display for Counter { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.0.fmt(f) + } +} diff --git a/proxy/src/telemetry/metrics/gauge.rs b/proxy/src/telemetry/metrics/gauge.rs new file mode 100644 index 000000000..150dacff2 --- /dev/null +++ b/proxy/src/telemetry/metrics/gauge.rs @@ -0,0 +1,43 @@ +use std::fmt; + +/// An instaneous metric value. +#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)] +pub struct Gauge(u64); + +impl Gauge { + /// Increment the gauge by one. + pub fn incr(&mut self) { + if let Some(new_value) = self.0.checked_add(1) { + (*self).0 = new_value; + } else { + warn!("Gauge overflow"); + } + } + + /// Decrement the gauge by one. + pub fn decr(&mut self) { + if let Some(new_value) = self.0.checked_sub(1) { + (*self).0 = new_value; + } else { + warn!("Gauge underflow"); + } + } +} + +impl From for Gauge { + fn from(n: u64) -> Self { + Gauge(n) + } +} + +impl Into for Gauge { + fn into(self) -> u64 { + self.0 + } +} + +impl fmt::Display for Gauge { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.0.fmt(f) + } +} diff --git a/proxy/src/telemetry/metrics/mod.rs b/proxy/src/telemetry/metrics/mod.rs index 703df0604..6e1bc2c5f 100644 --- a/proxy/src/telemetry/metrics/mod.rs +++ b/proxy/src/telemetry/metrics/mod.rs @@ -29,10 +29,8 @@ use std::default::Default; use std::{fmt, time}; use std::hash::Hash; -use std::num::Wrapping; use std::sync::{Arc, Mutex}; use std::io::Write; -use std::ops; use deflate::CompressionOptions; use deflate::write::GzEncoder; @@ -49,9 +47,13 @@ use indexmap::{IndexMap}; use ctx; use telemetry::event::Event; +mod counter; +mod gauge; mod labels; mod latency; +use self::counter::Counter; +use self::gauge::Gauge; use self::labels::{ RequestLabels, ResponseLabels, @@ -92,32 +94,6 @@ struct Metric { values: IndexMap } -/// A Prometheus counter is represented by a `Wrapping` unsigned 64-bit int. -/// -/// Counters always explicitly wrap on overflows rather than panicking in -/// debug builds. Prometheus' [`rate()`] and [`irate()`] queries handle breaks -/// in monotonicity gracefully (see also [`resets()`]), so wrapping is less -/// problematic than panicking in this case. -/// -/// Note, however, that Prometheus represents counters using 64-bit -/// floating-point numbers. The correct semantics are to ensure the counter -/// always gets reset to zero after Prometheus reads it, before it would ever -/// overflow a 52-bit `f64` mantissa. -/// -/// [`rate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#rate() -/// [`irate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#irate() -/// [`resets()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#resets -/// -// TODO: Implement Prometheus reset semantics correctly, taking into -// consideration that Prometheus models counters as `f64` and so -// there are only 52 significant bits. -#[derive(Copy, Debug, Default, Clone, Eq, PartialEq)] -pub struct Counter(Wrapping); - -/// A Prometheus gauge -#[derive(Copy, Debug, Default, Clone, Eq, PartialEq)] -pub struct Gauge(u64); - /// Tracks Prometheus metrics #[derive(Debug)] pub struct Aggregate { @@ -316,76 +292,6 @@ impl fmt::Display for TcpMetrics { } } -// ===== impl Counter ===== - -impl fmt::Display for Counter { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", (self.0).0 as f64) - } -} - -impl Into for Counter { - fn into(self) -> u64 { - (self.0).0 as u64 - } -} - -impl ops::Add for Counter { - type Output = Self; - fn add(self, Counter(rhs): Self) -> Self::Output { - Counter(self.0 + rhs) - } -} - -impl ops::AddAssign for Counter { - fn add_assign(&mut self, rhs: u64) { - (*self).0 += Wrapping(rhs) - } -} - - -impl Counter { - - /// Increment the counter by one. - /// - /// This function wraps on overflows. - pub fn incr(&mut self) -> &mut Self { - (*self).0 += Wrapping(1); - self - } -} - -// ===== impl Gauge ===== - -impl fmt::Display for Gauge { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self.0) - } -} - -impl Gauge { - /// Increment the gauge by one. - pub fn incr(&mut self) -> &mut Self { - if let Some(new_value) = self.0.checked_add(1) { - (*self).0 = new_value; - } else { - warn!("Gauge::incr() would wrap!"); - } - self - } - - /// Decrement the gauge by one. - pub fn decr(&mut self) -> &mut Self { - if let Some(new_value) = self.0.checked_sub(1) { - (*self).0 = new_value; - } else { - warn!("Gauge::decr() called on a gauge with value 0"); - } - self - } - -} - // ===== impl Metric ===== impl Metric { @@ -527,14 +433,14 @@ impl Aggregate { Event::StreamRequestFail(ref req, _) => { let labels = Arc::new(RequestLabels::new(req)); self.update(|metrics| { - *metrics.request_total(&labels).incr(); + metrics.request_total(&labels).incr(); }) }, Event::StreamRequestEnd(ref req, _) => { let labels = Arc::new(RequestLabels::new(req)); self.update(|metrics| { - *metrics.request_total(&labels).incr(); + metrics.request_total(&labels).incr(); }) }, @@ -544,7 +450,7 @@ impl Aggregate { end.grpc_status, )); self.update(|metrics| { - *metrics.response_total(&labels).incr(); + metrics.response_total(&labels).incr(); *metrics.response_latency(&labels) += end.since_request_open; }); }, @@ -553,7 +459,7 @@ impl Aggregate { // TODO: do we care about the failure's error code here? let labels = Arc::new(ResponseLabels::fail(res)); self.update(|metrics| { - *metrics.response_total(&labels).incr(); + metrics.response_total(&labels).incr(); *metrics.response_latency(&labels) += fail.since_request_open; }); }, @@ -561,8 +467,8 @@ impl Aggregate { Event::TransportOpen(ref ctx) => { let labels = Arc::new(TransportLabels::new(ctx)); self.update(|metrics| { - *metrics.tcp().open_total(&labels).incr(); - *metrics.tcp().open_connections(&labels).incr(); + metrics.tcp().open_total(&labels).incr(); + metrics.tcp().open_connections(&labels).incr(); }) }, @@ -574,13 +480,13 @@ impl Aggregate { *metrics.tcp().read_bytes_total(&labels) += close.rx_bytes as u64; *metrics.tcp().connection_duration(&close_labels) += close.duration; - *metrics.tcp().close_total(&close_labels).incr(); + metrics.tcp().close_total(&close_labels).incr(); let metrics = metrics.tcp().open_connections.values.get_mut(&labels); debug_assert!(metrics.is_some()); match metrics { Some(m) => { - *m.decr(); + m.decr(); } None => { error!("Closed transport missing from metrics registry: {{{}}}", labels);