All counters in proxy telemetry wrap on overflows (#603)

In #602, @olix0r suggested that telemetry counters should wrap on overflows, as "most timeseries systems (like prometheus) are designed to handle this case gracefully."

This PR changes counters to use explicitly wrapping arithmetic.

Closes #602.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
Eliza Weisman 2018-03-27 14:03:12 -07:00 committed by GitHub
parent ea2668b3ca
commit 81d4b8b783
3 changed files with 108 additions and 68 deletions

View File

@ -1,8 +1,10 @@
#![deny(missing_docs)]
use std::{fmt, ops, slice, u32};
use std::default::Default;
use std::{fmt, iter, ops, slice, u32};
use std::num::Wrapping;
use std::time::Duration;
use super::prometheus;
/// The number of buckets in a latency histogram.
pub const NUM_BUCKETS: usize = 26;
@ -56,16 +58,35 @@ pub const BUCKET_BOUNDS: [Latency; NUM_BUCKETS] = [
];
/// A series of latency values and counts.
#[derive(Debug, Clone)]
#[derive(Debug, Default, Clone)]
pub struct Histogram {
/// Array of buckets in which to count latencies.
///
/// The upper bound of a given bucket `i` is given in `BUCKET_BOUNDS[i]`.
buckets: [u32; NUM_BUCKETS],
buckets: [prometheus::Counter; NUM_BUCKETS],
/// The total sum of all observed latency values.
pub sum: u64,
///
/// Histogram sums always explicitly wrap on overflows rather than
/// panicking in debug builds. Prometheus' [`rate()`] and [`irate()`]
/// queries handle breaks in monotonicity gracefully (see also
/// [`resets()`]), so wrapping is less problematic than panicking in this
/// case.
///
/// Note, however, that Prometheus actually represents this using 64-bit
/// floating-point numbers. The correct semantics are to ensure the sum
/// always gets reset to zero after Prometheus reads it, before it would
/// ever overflow a 52-bit `f64` mantissa.
///
/// [`rate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#rate()
/// [`irate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#irate()
/// [`resets()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#resets
///
// TODO: Implement Prometheus reset semantics correctly, taking into
// consideration that Prometheus represents this as `f64` and so
// there are only 52 significant bits.
pub sum: Wrapping<u64>,
}
/// A latency in tenths of a millisecond.
@ -87,41 +108,8 @@ impl Histogram {
.position(|max| &measurement <= max)
.expect("latency value greater than u32::MAX; this shouldn't be \
possible.");
self.buckets[i] += 1;
// It's time to play ~*Will It Overflow???*~
//
// If we make the fairly generous assumptions of 1-minute latencies
// and 1 million RPS per set of metric labels (i.e. per pod), that
// gives us:
// 600,000 (1 minute = 600,000 tenths-of-milliseconds)
// x 1,000,000 (1 million RPS)
// ---------------
// 600,000,000,000 (6e11) gain per second
//
// times the number of seconds in a day (86,400):
// 6e11 x 86400 = 5.184e16
//
// 18,446,744,073,709,551,615 is the maximum 64-bit unsigned integer.
// 1.8446744073709551615e19 / 5.184e16 = 355 (about 1 year)
//
// So at 1 million RPS with 1-minute latencies, the sum will overflow
// in about a year. We don't really expect a conduit-proxy process to
// run that long (or see this kind of load), but we can revisit this
// if supporting extremely long-running deployments becomes a priority.
//
// (N.B. that storing latencies in whole milliseconds rather than tenths
// of milliseconds would change the time to overflow to almost 10
// years.)
self.sum += measurement.0 as u64;
}
/// Construct a new, empty `Histogram`.
pub fn new() -> Self {
Histogram {
buckets: [0; NUM_BUCKETS],
sum: 0,
}
self.buckets[i].incr();
self.sum += Wrapping(measurement.0 as u64);
}
/// Return the sum value of this histogram in milliseconds.
@ -130,7 +118,7 @@ impl Histogram {
/// internally recorded in tenths of milliseconds, which could
/// represent a number of milliseconds with a fractional part.
pub fn sum_in_ms(&self) -> f64 {
self.sum as f64 / MS_TO_TENTHS_OF_MS as f64
self.sum.0 as f64 / MS_TO_TENTHS_OF_MS as f64
}
}
@ -146,25 +134,19 @@ where
}
impl<'a> IntoIterator for &'a Histogram {
type Item = &'a u32;
type IntoIter = slice::Iter<'a, u32>;
type Item = u64;
type IntoIter = iter::Map<
slice::Iter<'a, prometheus::Counter>,
fn(&'a prometheus::Counter) -> u64
>;
fn into_iter(self) -> Self::IntoIter {
self.buckets.iter()
self.buckets.iter().map(|&count| count.into())
}
}
impl Default for Histogram {
#[inline]
fn default() -> Self {
Self::new()
}
}
// ===== impl Latency =====

View File

@ -251,7 +251,15 @@ impl Metrics {
ends: ends,
response_latency_counts: res_stats.latencies
.into_iter()
.map(|l| *l)
// NOTE: this potential truncation is unlikely to cause
// problems here, as the push metrics reports have
// different semantics from the scrapable Prometheus
// metrics. Push metrics are reset every time a report
// is generated, while the scrapable metrics last for
// the entire lifetime of the process.
//
// Furthermore, this code is slated for removal soon.
.map(|count| count as u32)
.collect(),
});
}

View File

@ -1,6 +1,7 @@
use std::default::Default;
use std::{fmt, u32, time};
use std::{fmt, ops, time, u32};
use std::hash::Hash;
use std::num::Wrapping;
use std::sync::{Arc, Mutex};
use futures::future::{self, FutureResult};
@ -38,8 +39,27 @@ struct Metric<M, L: Hash + Eq> {
values: IndexMap<L, M>
}
/// A Prometheus counter is represented by a `Wrapping` unsigned 64-bit int.
///
/// Counters always explicitly wrap on overflows rather than panicking in
/// debug builds. Prometheus' [`rate()`] and [`irate()`] queries handle breaks
/// in monotonicity gracefully (see also [`resets()`]), so wrapping is less
/// problematic than panicking in this case.
///
/// Note, however, that Prometheus represents counters using 64-bit
/// floating-point numbers. The correct semantics are to ensure the counter
/// always gets reset to zero after Prometheus reads it, before it would ever
/// overflow a 52-bit `f64` mantissa.
///
/// [`rate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#rate()
/// [`irate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#irate()
/// [`resets()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#resets
///
// TODO: Implement Prometheus reset semantics correctly, taking into
// consideration that Prometheus models counters as `f64` and so
// there are only 52 significant bits.
#[derive(Copy, Debug, Default, Clone, Eq, PartialEq)]
struct Counter(u64);
pub struct Counter(Wrapping<u64>);
/// Tracks Prometheus metrics
#[derive(Debug)]
@ -47,7 +67,6 @@ pub struct Aggregate {
metrics: Arc<Mutex<Metrics>>,
}
/// Serve Prometheues metrics.
#[derive(Debug, Clone)]
pub struct Serve {
@ -171,10 +190,12 @@ impl Metrics {
}
}
fn request_total(&mut self, labels: &Arc<RequestLabels>) -> &mut u64 {
&mut self.request_total.values
fn request_total(&mut self,
labels: &Arc<RequestLabels>)
-> &mut Counter {
self.request_total.values
.entry(labels.clone())
.or_insert_with(Default::default).0
.or_insert_with(Default::default)
}
fn request_duration(&mut self,
@ -201,10 +222,12 @@ impl Metrics {
.or_insert_with(Default::default)
}
fn response_total(&mut self, labels: &Arc<ResponseLabels>) -> &mut u64 {
&mut self.response_total.values
fn response_total(&mut self,
labels: &Arc<ResponseLabels>)
-> &mut Counter {
self.response_total.values
.entry(labels.clone())
.or_insert_with(Default::default).0
.or_insert_with(Default::default)
}
}
@ -222,9 +245,36 @@ impl fmt::Display for Metrics {
}
}
// ===== impl Counter =====
impl fmt::Display for Counter {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.0 as f64)
write!(f, "{}", (self.0).0 as f64)
}
}
impl Into<u64> for Counter {
fn into(self) -> u64 {
(self.0).0 as u64
}
}
impl ops::Add for Counter {
type Output = Self;
fn add(self, Counter(rhs): Self) -> Self::Output {
Counter(self.0 + rhs)
}
}
impl Counter {
/// Increment the counter by one.
///
/// This function wraps on overflows.
pub fn incr(&mut self) -> &mut Self {
(*self).0 += Wrapping(1);
self
}
}
@ -345,16 +395,16 @@ impl Aggregate {
Event::StreamRequestFail(ref req, ref fail) => {
let labels = Arc::new(RequestLabels::new(req));
self.update(|metrics| {
*metrics.request_total(&labels) += 1;
*metrics.request_duration(&labels) +=
fail.since_request_open;
*metrics.request_total(&labels).incr();
})
},
Event::StreamRequestEnd(ref req, ref end) => {
let labels = Arc::new(RequestLabels::new(req));
self.update(|metrics| {
*metrics.request_total(&labels) += 1;
*metrics.request_total(&labels).incr();
*metrics.request_duration(&labels) +=
end.since_request_open;
})
@ -366,7 +416,7 @@ impl Aggregate {
end.grpc_status,
));
self.update(|metrics| {
*metrics.response_total(&labels) += 1;
*metrics.response_total(&labels).incr();
*metrics.response_duration(&labels) += end.since_response_open;
*metrics.response_latency(&labels) += end.since_request_open;
});
@ -376,7 +426,7 @@ impl Aggregate {
// TODO: do we care about the failure's error code here?
let labels = Arc::new(ResponseLabels::new(res, None));
self.update(|metrics| {
*metrics.response_total(&labels) += 1;
*metrics.response_total(&labels).incr();
*metrics.response_duration(&labels) += fail.since_response_open;
*metrics.response_latency(&labels) += fail.since_request_open;
});