proxy: Make `Histogram` generic over its value (#868)

In order to support histograms measured in, for instance, microseconds,
Histogram should blind store integers without being aware of the unit.
In order to accomplish this, we make `Histogram` generic over a `V:
Into<u64>`, such that all values added to the histogram must be of type
`V`.

In doing this, we also make the histogram buckets configurable, though
we maintain the same defaults used for latency values.

The `Histogram` type has been moved to a new module, and the `Bucket`
and `Bounds` helper types have been introduced to help make histogram
logic clearer and latency-agnostic.
This commit is contained in:
Oliver Gould 2018-04-27 14:43:09 -07:00 committed by GitHub
parent 2e296dbf69
commit cf470439ef
3 changed files with 200 additions and 231 deletions

View File

@ -0,0 +1,125 @@
use std::{cmp, fmt, iter, slice};
use std::num::Wrapping;
use std::marker::PhantomData;
use super::Counter;
/// A series of latency values and counts.
#[derive(Debug, Clone)]
pub struct Histogram<V: Into<u64>> {
bounds: &'static Bounds,
buckets: Box<[Counter]>,
/// The total sum of all observed latency values.
///
/// Histogram sums always explicitly wrap on overflows rather than
/// panicking in debug builds. Prometheus' [`rate()`] and [`irate()`]
/// queries handle breaks in monotonicity gracefully (see also
/// [`resets()`]), so wrapping is less problematic than panicking in this
/// case.
///
/// Note, however, that Prometheus actually represents this using 64-bit
/// floating-point numbers. The correct semantics are to ensure the sum
/// always gets reset to zero after Prometheus reads it, before it would
/// ever overflow a 52-bit `f64` mantissa.
///
/// [`rate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#rate()
/// [`irate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#irate()
/// [`resets()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#resets
///
// TODO: Implement Prometheus reset semantics correctly, taking into consideration
// that Prometheus represents this as `f64` and so there are only 52 significant
// bits.
pub sum: Wrapping<u64>,
_p: PhantomData<V>,
}
#[derive(Debug, Eq, PartialEq, Copy, Clone, Hash)]
pub enum Bucket {
Le(u64),
Inf,
}
/// A series of increasing Buckets values.
#[derive(Debug)]
pub struct Bounds(pub &'static [Bucket]);
// ===== impl Histogram =====
impl<V: Into<u64>> Histogram<V> {
pub fn new(bounds: &'static Bounds) -> Self {
let mut buckets = Vec::with_capacity(bounds.0.len());
let mut prior = &Bucket::Le(0);
for bound in bounds.0.iter() {
assert!(prior < bound);
buckets.push(Counter::default());
prior = bound;
}
Self {
bounds,
buckets: buckets.into_boxed_slice(),
sum: Wrapping(0),
_p: PhantomData,
}
}
pub fn add(&mut self, v: V) {
let value = v.into();
let idx = self.bounds.0.iter()
.position(|b| match *b {
Bucket::Le(ceiling) => value <= ceiling,
Bucket::Inf => true,
})
.expect("all values must fit into a bucket");
self.buckets[idx].incr();
self.sum += Wrapping(value);
}
pub fn sum(&self) -> u64 {
self.sum.0
}
}
impl<'a, V: Into<u64>> IntoIterator for &'a Histogram<V> {
type Item = (&'a Bucket, &'a Counter);
type IntoIter = iter::Zip<
slice::Iter<'a, Bucket>,
slice::Iter<'a, Counter>,
>;
fn into_iter(self) -> Self::IntoIter {
self.bounds.0.iter().zip(self.buckets.iter())
}
}
// ===== impl Bucket =====
impl fmt::Display for Bucket {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Bucket::Le(v) => write!(f, "{}", v),
Bucket::Inf => write!(f, "+Inf"),
}
}
}
impl cmp::PartialOrd<Bucket> for Bucket {
fn partial_cmp(&self, rhs: &Bucket) -> Option<cmp::Ordering> {
Some(self.cmp(rhs))
}
}
impl cmp::Ord for Bucket {
fn cmp(&self, rhs: &Bucket) -> cmp::Ordering {
match (*self, *rhs) {
(Bucket::Le(s), Bucket::Le(r)) => s.cmp(&r),
(Bucket::Le(_), Bucket::Inf) => cmp::Ordering::Less,
(Bucket::Inf, Bucket::Le(_)) => cmp::Ordering::Greater,
(Bucket::Inf, Bucket::Inf) => cmp::Ordering::Equal,
}
}
}

View File

@ -1,229 +1,76 @@
#![deny(missing_docs)]
use std::{fmt, iter, ops, slice, u32};
use std::num::Wrapping;
use std::time::Duration;
use super::Counter;
/// The number of buckets in a latency histogram.
pub const NUM_BUCKETS: usize = 26;
use super::histogram::{Bounds, Bucket, Histogram};
/// The maximum value (inclusive) for each latency bucket in
/// tenths of a millisecond.
pub const BUCKET_BOUNDS: [Latency; NUM_BUCKETS] = [
pub const BOUNDS: &Bounds = &Bounds(&[
// The controller telemetry server creates 5 sets of 5 linear buckets
// each:
// TODO: it would be nice if we didn't have to hard-code each
// individual bucket and could use Rust ranges or something.
// However, because we're using a raw fixed size array rather
// than a vector (as we don't ever expect to grow this array
// and thus don't _need_ a vector) we can't concatenate it
// from smaller arrays, making it difficult to construct
// programmatically...
// in the controller:
// prometheus.LinearBuckets(1, 1, 5),
Latency(10),
Latency(20),
Latency(30),
Latency(40),
Latency(50),
Bucket::Le(10),
Bucket::Le(20),
Bucket::Le(30),
Bucket::Le(40),
Bucket::Le(50),
// prometheus.LinearBuckets(10, 10, 5),
Latency(100),
Latency(200),
Latency(300),
Latency(400),
Latency(500),
Bucket::Le(100),
Bucket::Le(200),
Bucket::Le(300),
Bucket::Le(400),
Bucket::Le(500),
// prometheus.LinearBuckets(100, 100, 5),
Latency(1_000),
Latency(2_000),
Latency(3_000),
Latency(4_000),
Latency(5_000),
Bucket::Le(1_000),
Bucket::Le(2_000),
Bucket::Le(3_000),
Bucket::Le(4_000),
Bucket::Le(5_000),
// prometheus.LinearBuckets(1000, 1000, 5),
Latency(10_000),
Latency(20_000),
Latency(30_000),
Latency(40_000),
Latency(50_000),
Bucket::Le(10_000),
Bucket::Le(20_000),
Bucket::Le(30_000),
Bucket::Le(40_000),
Bucket::Le(50_000),
// prometheus.LinearBuckets(10000, 10000, 5),
Latency(100_000),
Latency(200_000),
Latency(300_000),
Latency(400_000),
Latency(500_000),
// Prometheus implicitly creates a max bucket for everything that
// falls outside of the highest-valued bucket, but we need to
// create it explicitly.
Latency(u32::MAX),
];
Bucket::Le(100_000),
Bucket::Le(200_000),
Bucket::Le(300_000),
Bucket::Le(400_000),
Bucket::Le(500_000),
// A final upper bound.
Bucket::Inf,
]);
/// A series of latency values and counts.
/// A duration in milliseconds.
#[derive(Debug, Default, Clone)]
pub struct Histogram {
pub struct Ms(pub Duration);
/// Array of buckets in which to count latencies.
///
/// The upper bound of a given bucket `i` is given in `BUCKET_BOUNDS[i]`.
buckets: [Counter; NUM_BUCKETS],
// /// A duration in microseconds.
// #[derive(Debug, Default, Clone)]
// pub struct Us(pub Duration);
/// The total sum of all observed latency values.
///
/// Histogram sums always explicitly wrap on overflows rather than
/// panicking in debug builds. Prometheus' [`rate()`] and [`irate()`]
/// queries handle breaks in monotonicity gracefully (see also
/// [`resets()`]), so wrapping is less problematic than panicking in this
/// case.
///
/// Note, however, that Prometheus actually represents this using 64-bit
/// floating-point numbers. The correct semantics are to ensure the sum
/// always gets reset to zero after Prometheus reads it, before it would
/// ever overflow a 52-bit `f64` mantissa.
///
/// [`rate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#rate()
/// [`irate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#irate()
/// [`resets()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#resets
///
// TODO: Implement Prometheus reset semantics correctly, taking into
// consideration that Prometheus represents this as `f64` and so
// there are only 52 significant bits.
pub sum: Wrapping<u64>,
}
/// A latency in tenths of a millisecond.
#[derive(Debug, Default, Eq, PartialEq, Ord, PartialOrd, Copy, Clone, Hash)]
pub struct Latency(u32);
// ===== impl Histogram =====
impl Histogram {
/// Observe a measurement
pub fn observe<I>(&mut self, measurement: I)
where
I: Into<Latency>,
{
let measurement = measurement.into();
let i = BUCKET_BOUNDS.iter()
.position(|max| &measurement <= max)
.expect("latency value greater than u32::MAX; this shouldn't be \
possible.");
self.buckets[i].incr();
self.sum += Wrapping(measurement.0 as u64);
}
/// Return the sum value of this histogram in milliseconds.
///
/// The sum is returned as a floating-point value, as it's
/// internally recorded in tenths of milliseconds, which could
/// represent a number of milliseconds with a fractional part.
pub fn sum_in_ms(&self) -> f64 {
self.sum.0 as f64 / MS_TO_TENTHS_OF_MS as f64
}
}
impl<I> ops::AddAssign<I> for Histogram
where
I: Into<Latency>
{
#[inline]
fn add_assign(&mut self, measurement: I) {
self.observe(measurement)
}
}
impl<'a> IntoIterator for &'a Histogram {
type Item = u64;
type IntoIter = iter::Map<
slice::Iter<'a, Counter>,
fn(&'a Counter) -> u64
>;
fn into_iter(self) -> Self::IntoIter {
self.buckets.iter().map(|&count| count.into())
}
}
// ===== impl Latency =====
const SEC_TO_MS: u32 = 1_000;
const SEC_TO_TENTHS_OF_A_MS: u32 = SEC_TO_MS * 10;
const TENTHS_OF_MS_TO_NS: u32 = MS_TO_NS / 10;
const MS_TO_TENTHS_OF_MS: u32 = 10;
/// Conversion ratio from milliseconds to nanoseconds.
pub const MS_TO_NS: u32 = 1_000_000;
impl From<Duration> for Latency {
fn from(dur: Duration) -> Self {
let secs = dur.as_secs();
// checked conversion from u64 -> u32.
let secs =
if secs >= u64::from(u32::MAX) {
None
} else {
Some(secs as u32)
};
// represent the duration as tenths of a ms.
let tenths_of_ms = {
let t = secs.and_then(|as_secs|
// convert the number of seconds to tenths of a ms, or
// None on overflow.
as_secs.checked_mul(SEC_TO_TENTHS_OF_A_MS)
);
let t = t.and_then(|as_tenths_ms| {
// convert the subsecond part of the duration (in ns) to
// tenths of a millisecond.
let subsec_tenths_ms = dur.subsec_nanos() / TENTHS_OF_MS_TO_NS;
as_tenths_ms.checked_add(subsec_tenths_ms)
});
t.unwrap_or_else(|| {
debug!(
"{:?} too large to represent as tenths of a \
millisecond!",
dur
);
u32::MAX
})
};
Latency(tenths_of_ms)
}
}
impl From<u32> for Latency {
#[inline]
fn from(value: u32) -> Self {
Latency(value)
impl Into<u64> for Ms {
fn into(self) -> u64 {
self.0.as_secs().saturating_mul(1_000)
.saturating_add(u64::from(self.0.subsec_nanos()) / 1_000_000)
}
}
impl Into<u32> for Latency {
fn into(self) -> u32 {
self.0
impl Default for Histogram<Ms> {
fn default() -> Self {
Histogram::new(BOUNDS)
}
}
impl fmt::Display for Latency {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if self.0 == u32::MAX {
// Prometheus requires that text representations of numbers be in
// a format understandable by Go's strconv package. In particular,
// `-Inf`, `+Inf`, and `Nan` are used as the textual
// representations of floating point special values.
//
// We're representing latency buckets with u32s rather than floats,
// so we won't encounter these special values, but we want to treat
// the u32::MAX upper bound as the infinity bucket, so special case
// the formatting for u32::MAX.
write!(f, "+Inf")
} else {
// NOTE: if bucket values are changed so that they're no longer
// evenly divisible by ten, we may want to ensure that there's
// a reasonable cap on precision here.
write!(f, "{}", self.0 / MS_TO_TENTHS_OF_MS)
}
}
}
// impl Into<u64> for Us {
// fn into(self) -> u64 {
// self.0.as_secs().saturating_mul(1_000_000)
// .saturating_add(u64::from(self.0.subsec_nanos()) / 1_000)
// }
// }
// impl Default for Histogram<Us> {
// fn default() -> Self {
// Histogram::new(&BOUNDS)
// }
// }

View File

@ -49,18 +49,19 @@ use telemetry::event::Event;
mod counter;
mod gauge;
mod histogram;
mod labels;
mod latency;
use self::counter::Counter;
use self::gauge::Gauge;
use self::histogram::Histogram;
use self::labels::{
RequestLabels,
ResponseLabels,
TransportLabels,
TransportCloseLabels
};
use self::latency::{BUCKET_BOUNDS, Histogram};
pub use self::labels::DstLabels;
#[derive(Debug, Clone)]
@ -68,7 +69,7 @@ struct Metrics {
request_total: Metric<Counter, Arc<RequestLabels>>,
response_total: Metric<Counter, Arc<ResponseLabels>>,
response_latency: Metric<Histogram, Arc<ResponseLabels>>,
response_latency: Metric<Histogram<latency::Ms>, Arc<ResponseLabels>>,
tcp: TcpMetrics,
@ -80,7 +81,7 @@ struct TcpMetrics {
open_total: Metric<Counter, Arc<TransportLabels>>,
close_total: Metric<Counter, Arc<TransportCloseLabels>>,
connection_duration: Metric<Histogram, Arc<TransportCloseLabels>>,
connection_duration: Metric<Histogram<latency::Ms>, Arc<TransportCloseLabels>>,
open_connections: Metric<Gauge, Arc<TransportLabels>>,
write_bytes_total: Metric<Counter, Arc<TransportLabels>>,
@ -141,7 +142,7 @@ impl Metrics {
"A counter of the number of responses the proxy has received.",
);
let response_latency = Metric::<Histogram, Arc<ResponseLabels>>::new(
let response_latency = Metric::<Histogram<latency::Ms>, Arc<ResponseLabels>>::new(
"response_latency_ms",
"A histogram of the total latency of a response. This is measured \
from when the request headers are received to when the response \
@ -167,7 +168,7 @@ impl Metrics {
fn response_latency(&mut self,
labels: &Arc<ResponseLabels>)
-> &mut Histogram {
-> &mut Histogram<latency::Ms> {
self.response_latency.values
.entry(labels.clone())
.or_insert_with(Histogram::default)
@ -212,7 +213,7 @@ impl TcpMetrics {
"A counter of the total number of transport connections.",
);
let connection_duration = Metric::<Histogram, Arc<TransportCloseLabels>>::new(
let connection_duration = Metric::<Histogram<latency::Ms>, Arc<TransportCloseLabels>>::new(
"tcp_connection_duration_ms",
"A histogram of the duration of the lifetime of connections, in milliseconds",
);
@ -254,7 +255,7 @@ impl TcpMetrics {
.or_insert_with(Counter::default)
}
fn connection_duration(&mut self, labels: &Arc<TransportCloseLabels>) -> &mut Histogram {
fn connection_duration(&mut self, labels: &Arc<TransportCloseLabels>) -> &mut Histogram<latency::Ms> {
self.connection_duration.values
.entry(labels.clone())
.or_insert_with(Histogram::default)
@ -354,9 +355,10 @@ where
}
}
impl<L> fmt::Display for Metric<Histogram, L> where
impl<L, V> fmt::Display for Metric<Histogram<V>, L> where
L: fmt::Display,
L: Hash + Eq,
V: Into<u64>,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f,
@ -366,19 +368,13 @@ impl<L> fmt::Display for Metric<Histogram, L> where
)?;
for (labels, histogram) in &self.values {
// Look up the bucket numbers against the BUCKET_BOUNDS array
// to turn them into upper bounds.
let bounds_and_counts = histogram.into_iter()
.enumerate()
.map(|(num, count)| (BUCKET_BOUNDS[num], count));
// Since Prometheus expects each bucket's value to be the sum of
// the number of values in this bucket and all lower buckets,
// track the total count here.
let mut total_count = 0;
for (le, count) in bounds_and_counts {
// Since Prometheus expects each bucket's value to be the sum of the number of
// values in this bucket and all lower buckets, track the total count here.
let mut total_count = 0u64;
for (le, count) in histogram.into_iter() {
// Add this bucket's count to the total count.
total_count += count;
let c: u64 = (*count).into();
total_count += c;
write!(f, "{name}_bucket{{{labels},le=\"{le}\"}} {count}\n",
name = self.name,
labels = labels,
@ -395,7 +391,7 @@ impl<L> fmt::Display for Metric<Histogram, L> where
name = self.name,
labels = labels,
count = total_count,
sum = histogram.sum_in_ms(),
sum = histogram.sum(),
)?;
}
@ -451,7 +447,7 @@ impl Aggregate {
));
self.update(|metrics| {
metrics.response_total(&labels).incr();
*metrics.response_latency(&labels) += end.since_request_open;
metrics.response_latency(&labels).add(latency::Ms(end.since_request_open));
});
},
@ -460,7 +456,7 @@ impl Aggregate {
let labels = Arc::new(ResponseLabels::fail(res));
self.update(|metrics| {
metrics.response_total(&labels).incr();
*metrics.response_latency(&labels) += fail.since_request_open;
metrics.response_latency(&labels).add(latency::Ms(fail.since_request_open));
});
},
@ -479,7 +475,8 @@ impl Aggregate {
*metrics.tcp().write_bytes_total(&labels) += close.tx_bytes as u64;
*metrics.tcp().read_bytes_total(&labels) += close.rx_bytes as u64;
*metrics.tcp().connection_duration(&close_labels) += close.duration;
metrics.tcp().connection_duration(&close_labels)
.add(latency::Ms(close.duration));
metrics.tcp().close_total(&close_labels).incr();
let metrics = metrics.tcp().open_connections.values.get_mut(&labels);