From 81d4b8b78373471123c004cb72962311777d27f1 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 27 Mar 2018 14:03:12 -0700 Subject: [PATCH] All counters in proxy telemetry wrap on overflows (#603) In #602, @olix0r suggested that telemetry counters should wrap on overflows, as "most timeseries systems (like prometheus) are designed to handle this case gracefully." This PR changes counters to use explicitly wrapping arithmetic. Closes #602. Signed-off-by: Eliza Weisman --- proxy/src/telemetry/metrics/latency.rs | 88 +++++++++-------------- proxy/src/telemetry/metrics/mod.rs | 10 ++- proxy/src/telemetry/metrics/prometheus.rs | 78 ++++++++++++++++---- 3 files changed, 108 insertions(+), 68 deletions(-) diff --git a/proxy/src/telemetry/metrics/latency.rs b/proxy/src/telemetry/metrics/latency.rs index fab4fa4f1..cb16bf0f4 100644 --- a/proxy/src/telemetry/metrics/latency.rs +++ b/proxy/src/telemetry/metrics/latency.rs @@ -1,8 +1,10 @@ #![deny(missing_docs)] -use std::{fmt, ops, slice, u32}; -use std::default::Default; +use std::{fmt, iter, ops, slice, u32}; +use std::num::Wrapping; use std::time::Duration; +use super::prometheus; + /// The number of buckets in a latency histogram. pub const NUM_BUCKETS: usize = 26; @@ -56,16 +58,35 @@ pub const BUCKET_BOUNDS: [Latency; NUM_BUCKETS] = [ ]; /// A series of latency values and counts. -#[derive(Debug, Clone)] +#[derive(Debug, Default, Clone)] pub struct Histogram { /// Array of buckets in which to count latencies. /// /// The upper bound of a given bucket `i` is given in `BUCKET_BOUNDS[i]`. - buckets: [u32; NUM_BUCKETS], + buckets: [prometheus::Counter; NUM_BUCKETS], /// The total sum of all observed latency values. - pub sum: u64, + /// + /// Histogram sums always explicitly wrap on overflows rather than + /// panicking in debug builds. Prometheus' [`rate()`] and [`irate()`] + /// queries handle breaks in monotonicity gracefully (see also + /// [`resets()`]), so wrapping is less problematic than panicking in this + /// case. + /// + /// Note, however, that Prometheus actually represents this using 64-bit + /// floating-point numbers. The correct semantics are to ensure the sum + /// always gets reset to zero after Prometheus reads it, before it would + /// ever overflow a 52-bit `f64` mantissa. + /// + /// [`rate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#rate() + /// [`irate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#irate() + /// [`resets()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#resets + /// + // TODO: Implement Prometheus reset semantics correctly, taking into + // consideration that Prometheus represents this as `f64` and so + // there are only 52 significant bits. + pub sum: Wrapping, } /// A latency in tenths of a millisecond. @@ -87,41 +108,8 @@ impl Histogram { .position(|max| &measurement <= max) .expect("latency value greater than u32::MAX; this shouldn't be \ possible."); - self.buckets[i] += 1; - - // It's time to play ~*Will It Overflow???*~ - // - // If we make the fairly generous assumptions of 1-minute latencies - // and 1 million RPS per set of metric labels (i.e. per pod), that - // gives us: - // 600,000 (1 minute = 600,000 tenths-of-milliseconds) - // x 1,000,000 (1 million RPS) - // --------------- - // 600,000,000,000 (6e11) gain per second - // - // times the number of seconds in a day (86,400): - // 6e11 x 86400 = 5.184e16 - // - // 18,446,744,073,709,551,615 is the maximum 64-bit unsigned integer. - // 1.8446744073709551615e19 / 5.184e16 = 355 (about 1 year) - // - // So at 1 million RPS with 1-minute latencies, the sum will overflow - // in about a year. We don't really expect a conduit-proxy process to - // run that long (or see this kind of load), but we can revisit this - // if supporting extremely long-running deployments becomes a priority. - // - // (N.B. that storing latencies in whole milliseconds rather than tenths - // of milliseconds would change the time to overflow to almost 10 - // years.) - self.sum += measurement.0 as u64; - } - - /// Construct a new, empty `Histogram`. - pub fn new() -> Self { - Histogram { - buckets: [0; NUM_BUCKETS], - sum: 0, - } + self.buckets[i].incr(); + self.sum += Wrapping(measurement.0 as u64); } /// Return the sum value of this histogram in milliseconds. @@ -130,7 +118,7 @@ impl Histogram { /// internally recorded in tenths of milliseconds, which could /// represent a number of milliseconds with a fractional part. pub fn sum_in_ms(&self) -> f64 { - self.sum as f64 / MS_TO_TENTHS_OF_MS as f64 + self.sum.0 as f64 / MS_TO_TENTHS_OF_MS as f64 } } @@ -146,25 +134,19 @@ where } - impl<'a> IntoIterator for &'a Histogram { - type Item = &'a u32; - type IntoIter = slice::Iter<'a, u32>; + type Item = u64; + type IntoIter = iter::Map< + slice::Iter<'a, prometheus::Counter>, + fn(&'a prometheus::Counter) -> u64 + >; fn into_iter(self) -> Self::IntoIter { - self.buckets.iter() + self.buckets.iter().map(|&count| count.into()) } } - -impl Default for Histogram { - #[inline] - fn default() -> Self { - Self::new() - } -} - // ===== impl Latency ===== diff --git a/proxy/src/telemetry/metrics/mod.rs b/proxy/src/telemetry/metrics/mod.rs index 9ba5f31f4..b7a7b2e4a 100644 --- a/proxy/src/telemetry/metrics/mod.rs +++ b/proxy/src/telemetry/metrics/mod.rs @@ -251,7 +251,15 @@ impl Metrics { ends: ends, response_latency_counts: res_stats.latencies .into_iter() - .map(|l| *l) + // NOTE: this potential truncation is unlikely to cause + // problems here, as the push metrics reports have + // different semantics from the scrapable Prometheus + // metrics. Push metrics are reset every time a report + // is generated, while the scrapable metrics last for + // the entire lifetime of the process. + // + // Furthermore, this code is slated for removal soon. + .map(|count| count as u32) .collect(), }); } diff --git a/proxy/src/telemetry/metrics/prometheus.rs b/proxy/src/telemetry/metrics/prometheus.rs index 66f6a15c1..6fb75b09a 100644 --- a/proxy/src/telemetry/metrics/prometheus.rs +++ b/proxy/src/telemetry/metrics/prometheus.rs @@ -1,6 +1,7 @@ use std::default::Default; -use std::{fmt, u32, time}; +use std::{fmt, ops, time, u32}; use std::hash::Hash; +use std::num::Wrapping; use std::sync::{Arc, Mutex}; use futures::future::{self, FutureResult}; @@ -38,8 +39,27 @@ struct Metric { values: IndexMap } +/// A Prometheus counter is represented by a `Wrapping` unsigned 64-bit int. +/// +/// Counters always explicitly wrap on overflows rather than panicking in +/// debug builds. Prometheus' [`rate()`] and [`irate()`] queries handle breaks +/// in monotonicity gracefully (see also [`resets()`]), so wrapping is less +/// problematic than panicking in this case. +/// +/// Note, however, that Prometheus represents counters using 64-bit +/// floating-point numbers. The correct semantics are to ensure the counter +/// always gets reset to zero after Prometheus reads it, before it would ever +/// overflow a 52-bit `f64` mantissa. +/// +/// [`rate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#rate() +/// [`irate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#irate() +/// [`resets()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#resets +/// +// TODO: Implement Prometheus reset semantics correctly, taking into +// consideration that Prometheus models counters as `f64` and so +// there are only 52 significant bits. #[derive(Copy, Debug, Default, Clone, Eq, PartialEq)] -struct Counter(u64); +pub struct Counter(Wrapping); /// Tracks Prometheus metrics #[derive(Debug)] @@ -47,7 +67,6 @@ pub struct Aggregate { metrics: Arc>, } - /// Serve Prometheues metrics. #[derive(Debug, Clone)] pub struct Serve { @@ -171,10 +190,12 @@ impl Metrics { } } - fn request_total(&mut self, labels: &Arc) -> &mut u64 { - &mut self.request_total.values + fn request_total(&mut self, + labels: &Arc) + -> &mut Counter { + self.request_total.values .entry(labels.clone()) - .or_insert_with(Default::default).0 + .or_insert_with(Default::default) } fn request_duration(&mut self, @@ -201,10 +222,12 @@ impl Metrics { .or_insert_with(Default::default) } - fn response_total(&mut self, labels: &Arc) -> &mut u64 { - &mut self.response_total.values + fn response_total(&mut self, + labels: &Arc) + -> &mut Counter { + self.response_total.values .entry(labels.clone()) - .or_insert_with(Default::default).0 + .or_insert_with(Default::default) } } @@ -222,9 +245,36 @@ impl fmt::Display for Metrics { } } + +// ===== impl Counter ===== + impl fmt::Display for Counter { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self.0 as f64) + write!(f, "{}", (self.0).0 as f64) + } +} + +impl Into for Counter { + fn into(self) -> u64 { + (self.0).0 as u64 + } +} + +impl ops::Add for Counter { + type Output = Self; + fn add(self, Counter(rhs): Self) -> Self::Output { + Counter(self.0 + rhs) + } +} + +impl Counter { + + /// Increment the counter by one. + /// + /// This function wraps on overflows. + pub fn incr(&mut self) -> &mut Self { + (*self).0 += Wrapping(1); + self } } @@ -345,16 +395,16 @@ impl Aggregate { Event::StreamRequestFail(ref req, ref fail) => { let labels = Arc::new(RequestLabels::new(req)); self.update(|metrics| { - *metrics.request_total(&labels) += 1; *metrics.request_duration(&labels) += fail.since_request_open; + *metrics.request_total(&labels).incr(); }) }, Event::StreamRequestEnd(ref req, ref end) => { let labels = Arc::new(RequestLabels::new(req)); self.update(|metrics| { - *metrics.request_total(&labels) += 1; + *metrics.request_total(&labels).incr(); *metrics.request_duration(&labels) += end.since_request_open; }) @@ -366,7 +416,7 @@ impl Aggregate { end.grpc_status, )); self.update(|metrics| { - *metrics.response_total(&labels) += 1; + *metrics.response_total(&labels).incr(); *metrics.response_duration(&labels) += end.since_response_open; *metrics.response_latency(&labels) += end.since_request_open; }); @@ -376,7 +426,7 @@ impl Aggregate { // TODO: do we care about the failure's error code here? let labels = Arc::new(ResponseLabels::new(res, None)); self.update(|metrics| { - *metrics.response_total(&labels) += 1; + *metrics.response_total(&labels).incr(); *metrics.response_duration(&labels) += fail.since_response_open; *metrics.response_latency(&labels) += fail.since_request_open; });