diff --git a/proxy/src/telemetry/metrics/latency.rs b/proxy/src/telemetry/metrics/latency.rs index fab4fa4f1..cb16bf0f4 100644 --- a/proxy/src/telemetry/metrics/latency.rs +++ b/proxy/src/telemetry/metrics/latency.rs @@ -1,8 +1,10 @@ #![deny(missing_docs)] -use std::{fmt, ops, slice, u32}; -use std::default::Default; +use std::{fmt, iter, ops, slice, u32}; +use std::num::Wrapping; use std::time::Duration; +use super::prometheus; + /// The number of buckets in a latency histogram. pub const NUM_BUCKETS: usize = 26; @@ -56,16 +58,35 @@ pub const BUCKET_BOUNDS: [Latency; NUM_BUCKETS] = [ ]; /// A series of latency values and counts. -#[derive(Debug, Clone)] +#[derive(Debug, Default, Clone)] pub struct Histogram { /// Array of buckets in which to count latencies. /// /// The upper bound of a given bucket `i` is given in `BUCKET_BOUNDS[i]`. - buckets: [u32; NUM_BUCKETS], + buckets: [prometheus::Counter; NUM_BUCKETS], /// The total sum of all observed latency values. - pub sum: u64, + /// + /// Histogram sums always explicitly wrap on overflows rather than + /// panicking in debug builds. Prometheus' [`rate()`] and [`irate()`] + /// queries handle breaks in monotonicity gracefully (see also + /// [`resets()`]), so wrapping is less problematic than panicking in this + /// case. + /// + /// Note, however, that Prometheus actually represents this using 64-bit + /// floating-point numbers. The correct semantics are to ensure the sum + /// always gets reset to zero after Prometheus reads it, before it would + /// ever overflow a 52-bit `f64` mantissa. + /// + /// [`rate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#rate() + /// [`irate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#irate() + /// [`resets()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#resets + /// + // TODO: Implement Prometheus reset semantics correctly, taking into + // consideration that Prometheus represents this as `f64` and so + // there are only 52 significant bits. + pub sum: Wrapping, } /// A latency in tenths of a millisecond. @@ -87,41 +108,8 @@ impl Histogram { .position(|max| &measurement <= max) .expect("latency value greater than u32::MAX; this shouldn't be \ possible."); - self.buckets[i] += 1; - - // It's time to play ~*Will It Overflow???*~ - // - // If we make the fairly generous assumptions of 1-minute latencies - // and 1 million RPS per set of metric labels (i.e. per pod), that - // gives us: - // 600,000 (1 minute = 600,000 tenths-of-milliseconds) - // x 1,000,000 (1 million RPS) - // --------------- - // 600,000,000,000 (6e11) gain per second - // - // times the number of seconds in a day (86,400): - // 6e11 x 86400 = 5.184e16 - // - // 18,446,744,073,709,551,615 is the maximum 64-bit unsigned integer. - // 1.8446744073709551615e19 / 5.184e16 = 355 (about 1 year) - // - // So at 1 million RPS with 1-minute latencies, the sum will overflow - // in about a year. We don't really expect a conduit-proxy process to - // run that long (or see this kind of load), but we can revisit this - // if supporting extremely long-running deployments becomes a priority. - // - // (N.B. that storing latencies in whole milliseconds rather than tenths - // of milliseconds would change the time to overflow to almost 10 - // years.) - self.sum += measurement.0 as u64; - } - - /// Construct a new, empty `Histogram`. - pub fn new() -> Self { - Histogram { - buckets: [0; NUM_BUCKETS], - sum: 0, - } + self.buckets[i].incr(); + self.sum += Wrapping(measurement.0 as u64); } /// Return the sum value of this histogram in milliseconds. @@ -130,7 +118,7 @@ impl Histogram { /// internally recorded in tenths of milliseconds, which could /// represent a number of milliseconds with a fractional part. pub fn sum_in_ms(&self) -> f64 { - self.sum as f64 / MS_TO_TENTHS_OF_MS as f64 + self.sum.0 as f64 / MS_TO_TENTHS_OF_MS as f64 } } @@ -146,25 +134,19 @@ where } - impl<'a> IntoIterator for &'a Histogram { - type Item = &'a u32; - type IntoIter = slice::Iter<'a, u32>; + type Item = u64; + type IntoIter = iter::Map< + slice::Iter<'a, prometheus::Counter>, + fn(&'a prometheus::Counter) -> u64 + >; fn into_iter(self) -> Self::IntoIter { - self.buckets.iter() + self.buckets.iter().map(|&count| count.into()) } } - -impl Default for Histogram { - #[inline] - fn default() -> Self { - Self::new() - } -} - // ===== impl Latency ===== diff --git a/proxy/src/telemetry/metrics/mod.rs b/proxy/src/telemetry/metrics/mod.rs index 9ba5f31f4..b7a7b2e4a 100644 --- a/proxy/src/telemetry/metrics/mod.rs +++ b/proxy/src/telemetry/metrics/mod.rs @@ -251,7 +251,15 @@ impl Metrics { ends: ends, response_latency_counts: res_stats.latencies .into_iter() - .map(|l| *l) + // NOTE: this potential truncation is unlikely to cause + // problems here, as the push metrics reports have + // different semantics from the scrapable Prometheus + // metrics. Push metrics are reset every time a report + // is generated, while the scrapable metrics last for + // the entire lifetime of the process. + // + // Furthermore, this code is slated for removal soon. + .map(|count| count as u32) .collect(), }); } diff --git a/proxy/src/telemetry/metrics/prometheus.rs b/proxy/src/telemetry/metrics/prometheus.rs index 66f6a15c1..6fb75b09a 100644 --- a/proxy/src/telemetry/metrics/prometheus.rs +++ b/proxy/src/telemetry/metrics/prometheus.rs @@ -1,6 +1,7 @@ use std::default::Default; -use std::{fmt, u32, time}; +use std::{fmt, ops, time, u32}; use std::hash::Hash; +use std::num::Wrapping; use std::sync::{Arc, Mutex}; use futures::future::{self, FutureResult}; @@ -38,8 +39,27 @@ struct Metric { values: IndexMap } +/// A Prometheus counter is represented by a `Wrapping` unsigned 64-bit int. +/// +/// Counters always explicitly wrap on overflows rather than panicking in +/// debug builds. Prometheus' [`rate()`] and [`irate()`] queries handle breaks +/// in monotonicity gracefully (see also [`resets()`]), so wrapping is less +/// problematic than panicking in this case. +/// +/// Note, however, that Prometheus represents counters using 64-bit +/// floating-point numbers. The correct semantics are to ensure the counter +/// always gets reset to zero after Prometheus reads it, before it would ever +/// overflow a 52-bit `f64` mantissa. +/// +/// [`rate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#rate() +/// [`irate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#irate() +/// [`resets()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#resets +/// +// TODO: Implement Prometheus reset semantics correctly, taking into +// consideration that Prometheus models counters as `f64` and so +// there are only 52 significant bits. #[derive(Copy, Debug, Default, Clone, Eq, PartialEq)] -struct Counter(u64); +pub struct Counter(Wrapping); /// Tracks Prometheus metrics #[derive(Debug)] @@ -47,7 +67,6 @@ pub struct Aggregate { metrics: Arc>, } - /// Serve Prometheues metrics. #[derive(Debug, Clone)] pub struct Serve { @@ -171,10 +190,12 @@ impl Metrics { } } - fn request_total(&mut self, labels: &Arc) -> &mut u64 { - &mut self.request_total.values + fn request_total(&mut self, + labels: &Arc) + -> &mut Counter { + self.request_total.values .entry(labels.clone()) - .or_insert_with(Default::default).0 + .or_insert_with(Default::default) } fn request_duration(&mut self, @@ -201,10 +222,12 @@ impl Metrics { .or_insert_with(Default::default) } - fn response_total(&mut self, labels: &Arc) -> &mut u64 { - &mut self.response_total.values + fn response_total(&mut self, + labels: &Arc) + -> &mut Counter { + self.response_total.values .entry(labels.clone()) - .or_insert_with(Default::default).0 + .or_insert_with(Default::default) } } @@ -222,9 +245,36 @@ impl fmt::Display for Metrics { } } + +// ===== impl Counter ===== + impl fmt::Display for Counter { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self.0 as f64) + write!(f, "{}", (self.0).0 as f64) + } +} + +impl Into for Counter { + fn into(self) -> u64 { + (self.0).0 as u64 + } +} + +impl ops::Add for Counter { + type Output = Self; + fn add(self, Counter(rhs): Self) -> Self::Output { + Counter(self.0 + rhs) + } +} + +impl Counter { + + /// Increment the counter by one. + /// + /// This function wraps on overflows. + pub fn incr(&mut self) -> &mut Self { + (*self).0 += Wrapping(1); + self } } @@ -345,16 +395,16 @@ impl Aggregate { Event::StreamRequestFail(ref req, ref fail) => { let labels = Arc::new(RequestLabels::new(req)); self.update(|metrics| { - *metrics.request_total(&labels) += 1; *metrics.request_duration(&labels) += fail.since_request_open; + *metrics.request_total(&labels).incr(); }) }, Event::StreamRequestEnd(ref req, ref end) => { let labels = Arc::new(RequestLabels::new(req)); self.update(|metrics| { - *metrics.request_total(&labels) += 1; + *metrics.request_total(&labels).incr(); *metrics.request_duration(&labels) += end.since_request_open; }) @@ -366,7 +416,7 @@ impl Aggregate { end.grpc_status, )); self.update(|metrics| { - *metrics.response_total(&labels) += 1; + *metrics.response_total(&labels).incr(); *metrics.response_duration(&labels) += end.since_response_open; *metrics.response_latency(&labels) += end.since_request_open; }); @@ -376,7 +426,7 @@ impl Aggregate { // TODO: do we care about the failure's error code here? let labels = Arc::new(ResponseLabels::new(res, None)); self.update(|metrics| { - *metrics.response_total(&labels) += 1; + *metrics.response_total(&labels).incr(); *metrics.response_duration(&labels) += fail.since_response_open; *metrics.response_latency(&labels) += fail.since_request_open; });