From cf470439ef84ac80d35974714357b3ebb76e026e Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 27 Apr 2018 14:43:09 -0700 Subject: [PATCH] proxy: Make `Histogram` generic over its value (#868) In order to support histograms measured in, for instance, microseconds, Histogram should blind store integers without being aware of the unit. In order to accomplish this, we make `Histogram` generic over a `V: Into`, such that all values added to the histogram must be of type `V`. In doing this, we also make the histogram buckets configurable, though we maintain the same defaults used for latency values. The `Histogram` type has been moved to a new module, and the `Bucket` and `Bounds` helper types have been introduced to help make histogram logic clearer and latency-agnostic. --- proxy/src/telemetry/metrics/histogram.rs | 125 +++++++++++ proxy/src/telemetry/metrics/latency.rs | 261 +++++------------------ proxy/src/telemetry/metrics/mod.rs | 45 ++-- 3 files changed, 200 insertions(+), 231 deletions(-) create mode 100644 proxy/src/telemetry/metrics/histogram.rs diff --git a/proxy/src/telemetry/metrics/histogram.rs b/proxy/src/telemetry/metrics/histogram.rs new file mode 100644 index 000000000..f151637c2 --- /dev/null +++ b/proxy/src/telemetry/metrics/histogram.rs @@ -0,0 +1,125 @@ +use std::{cmp, fmt, iter, slice}; +use std::num::Wrapping; +use std::marker::PhantomData; + +use super::Counter; + +/// A series of latency values and counts. +#[derive(Debug, Clone)] +pub struct Histogram> { + bounds: &'static Bounds, + buckets: Box<[Counter]>, + + /// The total sum of all observed latency values. + /// + /// Histogram sums always explicitly wrap on overflows rather than + /// panicking in debug builds. Prometheus' [`rate()`] and [`irate()`] + /// queries handle breaks in monotonicity gracefully (see also + /// [`resets()`]), so wrapping is less problematic than panicking in this + /// case. + /// + /// Note, however, that Prometheus actually represents this using 64-bit + /// floating-point numbers. The correct semantics are to ensure the sum + /// always gets reset to zero after Prometheus reads it, before it would + /// ever overflow a 52-bit `f64` mantissa. + /// + /// [`rate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#rate() + /// [`irate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#irate() + /// [`resets()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#resets + /// + // TODO: Implement Prometheus reset semantics correctly, taking into consideration + // that Prometheus represents this as `f64` and so there are only 52 significant + // bits. + pub sum: Wrapping, + + _p: PhantomData, +} + +#[derive(Debug, Eq, PartialEq, Copy, Clone, Hash)] +pub enum Bucket { + Le(u64), + Inf, +} + +/// A series of increasing Buckets values. +#[derive(Debug)] +pub struct Bounds(pub &'static [Bucket]); + +// ===== impl Histogram ===== + +impl> Histogram { + pub fn new(bounds: &'static Bounds) -> Self { + let mut buckets = Vec::with_capacity(bounds.0.len()); + let mut prior = &Bucket::Le(0); + for bound in bounds.0.iter() { + assert!(prior < bound); + buckets.push(Counter::default()); + prior = bound; + } + + Self { + bounds, + buckets: buckets.into_boxed_slice(), + sum: Wrapping(0), + _p: PhantomData, + } + } + + pub fn add(&mut self, v: V) { + let value = v.into(); + + let idx = self.bounds.0.iter() + .position(|b| match *b { + Bucket::Le(ceiling) => value <= ceiling, + Bucket::Inf => true, + }) + .expect("all values must fit into a bucket"); + + self.buckets[idx].incr(); + self.sum += Wrapping(value); + } + + pub fn sum(&self) -> u64 { + self.sum.0 + } +} + +impl<'a, V: Into> IntoIterator for &'a Histogram { + type Item = (&'a Bucket, &'a Counter); + type IntoIter = iter::Zip< + slice::Iter<'a, Bucket>, + slice::Iter<'a, Counter>, + >; + + fn into_iter(self) -> Self::IntoIter { + self.bounds.0.iter().zip(self.buckets.iter()) + } +} + +// ===== impl Bucket ===== + +impl fmt::Display for Bucket { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Bucket::Le(v) => write!(f, "{}", v), + Bucket::Inf => write!(f, "+Inf"), + } + } +} + +impl cmp::PartialOrd for Bucket { + fn partial_cmp(&self, rhs: &Bucket) -> Option { + Some(self.cmp(rhs)) + } +} + +impl cmp::Ord for Bucket { + fn cmp(&self, rhs: &Bucket) -> cmp::Ordering { + match (*self, *rhs) { + (Bucket::Le(s), Bucket::Le(r)) => s.cmp(&r), + (Bucket::Le(_), Bucket::Inf) => cmp::Ordering::Less, + (Bucket::Inf, Bucket::Le(_)) => cmp::Ordering::Greater, + (Bucket::Inf, Bucket::Inf) => cmp::Ordering::Equal, + } + } +} diff --git a/proxy/src/telemetry/metrics/latency.rs b/proxy/src/telemetry/metrics/latency.rs index e3b1a4101..e3117289b 100644 --- a/proxy/src/telemetry/metrics/latency.rs +++ b/proxy/src/telemetry/metrics/latency.rs @@ -1,229 +1,76 @@ -#![deny(missing_docs)] -use std::{fmt, iter, ops, slice, u32}; -use std::num::Wrapping; use std::time::Duration; -use super::Counter; -/// The number of buckets in a latency histogram. -pub const NUM_BUCKETS: usize = 26; +use super::histogram::{Bounds, Bucket, Histogram}; /// The maximum value (inclusive) for each latency bucket in /// tenths of a millisecond. -pub const BUCKET_BOUNDS: [Latency; NUM_BUCKETS] = [ +pub const BOUNDS: &Bounds = &Bounds(&[ // The controller telemetry server creates 5 sets of 5 linear buckets // each: - // TODO: it would be nice if we didn't have to hard-code each - // individual bucket and could use Rust ranges or something. - // However, because we're using a raw fixed size array rather - // than a vector (as we don't ever expect to grow this array - // and thus don't _need_ a vector) we can't concatenate it - // from smaller arrays, making it difficult to construct - // programmatically... - // in the controller: // prometheus.LinearBuckets(1, 1, 5), - Latency(10), - Latency(20), - Latency(30), - Latency(40), - Latency(50), + Bucket::Le(10), + Bucket::Le(20), + Bucket::Le(30), + Bucket::Le(40), + Bucket::Le(50), // prometheus.LinearBuckets(10, 10, 5), - Latency(100), - Latency(200), - Latency(300), - Latency(400), - Latency(500), + Bucket::Le(100), + Bucket::Le(200), + Bucket::Le(300), + Bucket::Le(400), + Bucket::Le(500), // prometheus.LinearBuckets(100, 100, 5), - Latency(1_000), - Latency(2_000), - Latency(3_000), - Latency(4_000), - Latency(5_000), + Bucket::Le(1_000), + Bucket::Le(2_000), + Bucket::Le(3_000), + Bucket::Le(4_000), + Bucket::Le(5_000), // prometheus.LinearBuckets(1000, 1000, 5), - Latency(10_000), - Latency(20_000), - Latency(30_000), - Latency(40_000), - Latency(50_000), + Bucket::Le(10_000), + Bucket::Le(20_000), + Bucket::Le(30_000), + Bucket::Le(40_000), + Bucket::Le(50_000), // prometheus.LinearBuckets(10000, 10000, 5), - Latency(100_000), - Latency(200_000), - Latency(300_000), - Latency(400_000), - Latency(500_000), - // Prometheus implicitly creates a max bucket for everything that - // falls outside of the highest-valued bucket, but we need to - // create it explicitly. - Latency(u32::MAX), -]; + Bucket::Le(100_000), + Bucket::Le(200_000), + Bucket::Le(300_000), + Bucket::Le(400_000), + Bucket::Le(500_000), + // A final upper bound. + Bucket::Inf, +]); -/// A series of latency values and counts. +/// A duration in milliseconds. #[derive(Debug, Default, Clone)] -pub struct Histogram { +pub struct Ms(pub Duration); - /// Array of buckets in which to count latencies. - /// - /// The upper bound of a given bucket `i` is given in `BUCKET_BOUNDS[i]`. - buckets: [Counter; NUM_BUCKETS], +// /// A duration in microseconds. +// #[derive(Debug, Default, Clone)] +// pub struct Us(pub Duration); - /// The total sum of all observed latency values. - /// - /// Histogram sums always explicitly wrap on overflows rather than - /// panicking in debug builds. Prometheus' [`rate()`] and [`irate()`] - /// queries handle breaks in monotonicity gracefully (see also - /// [`resets()`]), so wrapping is less problematic than panicking in this - /// case. - /// - /// Note, however, that Prometheus actually represents this using 64-bit - /// floating-point numbers. The correct semantics are to ensure the sum - /// always gets reset to zero after Prometheus reads it, before it would - /// ever overflow a 52-bit `f64` mantissa. - /// - /// [`rate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#rate() - /// [`irate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#irate() - /// [`resets()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#resets - /// - // TODO: Implement Prometheus reset semantics correctly, taking into - // consideration that Prometheus represents this as `f64` and so - // there are only 52 significant bits. - pub sum: Wrapping, -} - -/// A latency in tenths of a millisecond. -#[derive(Debug, Default, Eq, PartialEq, Ord, PartialOrd, Copy, Clone, Hash)] -pub struct Latency(u32); - - -// ===== impl Histogram ===== - -impl Histogram { - - /// Observe a measurement - pub fn observe(&mut self, measurement: I) - where - I: Into, - { - let measurement = measurement.into(); - let i = BUCKET_BOUNDS.iter() - .position(|max| &measurement <= max) - .expect("latency value greater than u32::MAX; this shouldn't be \ - possible."); - self.buckets[i].incr(); - self.sum += Wrapping(measurement.0 as u64); - } - - /// Return the sum value of this histogram in milliseconds. - /// - /// The sum is returned as a floating-point value, as it's - /// internally recorded in tenths of milliseconds, which could - /// represent a number of milliseconds with a fractional part. - pub fn sum_in_ms(&self) -> f64 { - self.sum.0 as f64 / MS_TO_TENTHS_OF_MS as f64 - } - -} - -impl ops::AddAssign for Histogram -where - I: Into -{ - #[inline] - fn add_assign(&mut self, measurement: I) { - self.observe(measurement) - } - -} - -impl<'a> IntoIterator for &'a Histogram { - type Item = u64; - type IntoIter = iter::Map< - slice::Iter<'a, Counter>, - fn(&'a Counter) -> u64 - >; - - fn into_iter(self) -> Self::IntoIter { - self.buckets.iter().map(|&count| count.into()) - } - -} - -// ===== impl Latency ===== - - -const SEC_TO_MS: u32 = 1_000; -const SEC_TO_TENTHS_OF_A_MS: u32 = SEC_TO_MS * 10; -const TENTHS_OF_MS_TO_NS: u32 = MS_TO_NS / 10; -const MS_TO_TENTHS_OF_MS: u32 = 10; -/// Conversion ratio from milliseconds to nanoseconds. -pub const MS_TO_NS: u32 = 1_000_000; - -impl From for Latency { - fn from(dur: Duration) -> Self { - let secs = dur.as_secs(); - // checked conversion from u64 -> u32. - let secs = - if secs >= u64::from(u32::MAX) { - None - } else { - Some(secs as u32) - }; - // represent the duration as tenths of a ms. - let tenths_of_ms = { - let t = secs.and_then(|as_secs| - // convert the number of seconds to tenths of a ms, or - // None on overflow. - as_secs.checked_mul(SEC_TO_TENTHS_OF_A_MS) - ); - let t = t.and_then(|as_tenths_ms| { - // convert the subsecond part of the duration (in ns) to - // tenths of a millisecond. - let subsec_tenths_ms = dur.subsec_nanos() / TENTHS_OF_MS_TO_NS; - as_tenths_ms.checked_add(subsec_tenths_ms) - }); - t.unwrap_or_else(|| { - debug!( - "{:?} too large to represent as tenths of a \ - millisecond!", - dur - ); - u32::MAX - }) - }; - Latency(tenths_of_ms) - } - -} - -impl From for Latency { - #[inline] - fn from(value: u32) -> Self { - Latency(value) +impl Into for Ms { + fn into(self) -> u64 { + self.0.as_secs().saturating_mul(1_000) + .saturating_add(u64::from(self.0.subsec_nanos()) / 1_000_000) } } -impl Into for Latency { - fn into(self) -> u32 { - self.0 +impl Default for Histogram { + fn default() -> Self { + Histogram::new(BOUNDS) } } -impl fmt::Display for Latency { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - if self.0 == u32::MAX { - // Prometheus requires that text representations of numbers be in - // a format understandable by Go's strconv package. In particular, - // `-Inf`, `+Inf`, and `Nan` are used as the textual - // representations of floating point special values. - // - // We're representing latency buckets with u32s rather than floats, - // so we won't encounter these special values, but we want to treat - // the u32::MAX upper bound as the infinity bucket, so special case - // the formatting for u32::MAX. - write!(f, "+Inf") - } else { - // NOTE: if bucket values are changed so that they're no longer - // evenly divisible by ten, we may want to ensure that there's - // a reasonable cap on precision here. - write!(f, "{}", self.0 / MS_TO_TENTHS_OF_MS) - } - } -} +// impl Into for Us { +// fn into(self) -> u64 { +// self.0.as_secs().saturating_mul(1_000_000) +// .saturating_add(u64::from(self.0.subsec_nanos()) / 1_000) +// } +// } + +// impl Default for Histogram { +// fn default() -> Self { +// Histogram::new(&BOUNDS) +// } +// } diff --git a/proxy/src/telemetry/metrics/mod.rs b/proxy/src/telemetry/metrics/mod.rs index 6e1bc2c5f..2aa867351 100644 --- a/proxy/src/telemetry/metrics/mod.rs +++ b/proxy/src/telemetry/metrics/mod.rs @@ -49,18 +49,19 @@ use telemetry::event::Event; mod counter; mod gauge; +mod histogram; mod labels; mod latency; use self::counter::Counter; use self::gauge::Gauge; +use self::histogram::Histogram; use self::labels::{ RequestLabels, ResponseLabels, TransportLabels, TransportCloseLabels }; -use self::latency::{BUCKET_BOUNDS, Histogram}; pub use self::labels::DstLabels; #[derive(Debug, Clone)] @@ -68,7 +69,7 @@ struct Metrics { request_total: Metric>, response_total: Metric>, - response_latency: Metric>, + response_latency: Metric, Arc>, tcp: TcpMetrics, @@ -80,7 +81,7 @@ struct TcpMetrics { open_total: Metric>, close_total: Metric>, - connection_duration: Metric>, + connection_duration: Metric, Arc>, open_connections: Metric>, write_bytes_total: Metric>, @@ -141,7 +142,7 @@ impl Metrics { "A counter of the number of responses the proxy has received.", ); - let response_latency = Metric::>::new( + let response_latency = Metric::, Arc>::new( "response_latency_ms", "A histogram of the total latency of a response. This is measured \ from when the request headers are received to when the response \ @@ -167,7 +168,7 @@ impl Metrics { fn response_latency(&mut self, labels: &Arc) - -> &mut Histogram { + -> &mut Histogram { self.response_latency.values .entry(labels.clone()) .or_insert_with(Histogram::default) @@ -212,7 +213,7 @@ impl TcpMetrics { "A counter of the total number of transport connections.", ); - let connection_duration = Metric::>::new( + let connection_duration = Metric::, Arc>::new( "tcp_connection_duration_ms", "A histogram of the duration of the lifetime of connections, in milliseconds", ); @@ -254,7 +255,7 @@ impl TcpMetrics { .or_insert_with(Counter::default) } - fn connection_duration(&mut self, labels: &Arc) -> &mut Histogram { + fn connection_duration(&mut self, labels: &Arc) -> &mut Histogram { self.connection_duration.values .entry(labels.clone()) .or_insert_with(Histogram::default) @@ -354,9 +355,10 @@ where } } -impl fmt::Display for Metric where +impl fmt::Display for Metric, L> where L: fmt::Display, L: Hash + Eq, + V: Into, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, @@ -366,19 +368,13 @@ impl fmt::Display for Metric where )?; for (labels, histogram) in &self.values { - // Look up the bucket numbers against the BUCKET_BOUNDS array - // to turn them into upper bounds. - let bounds_and_counts = histogram.into_iter() - .enumerate() - .map(|(num, count)| (BUCKET_BOUNDS[num], count)); - - // Since Prometheus expects each bucket's value to be the sum of - // the number of values in this bucket and all lower buckets, - // track the total count here. - let mut total_count = 0; - for (le, count) in bounds_and_counts { + // Since Prometheus expects each bucket's value to be the sum of the number of + // values in this bucket and all lower buckets, track the total count here. + let mut total_count = 0u64; + for (le, count) in histogram.into_iter() { // Add this bucket's count to the total count. - total_count += count; + let c: u64 = (*count).into(); + total_count += c; write!(f, "{name}_bucket{{{labels},le=\"{le}\"}} {count}\n", name = self.name, labels = labels, @@ -395,7 +391,7 @@ impl fmt::Display for Metric where name = self.name, labels = labels, count = total_count, - sum = histogram.sum_in_ms(), + sum = histogram.sum(), )?; } @@ -451,7 +447,7 @@ impl Aggregate { )); self.update(|metrics| { metrics.response_total(&labels).incr(); - *metrics.response_latency(&labels) += end.since_request_open; + metrics.response_latency(&labels).add(latency::Ms(end.since_request_open)); }); }, @@ -460,7 +456,7 @@ impl Aggregate { let labels = Arc::new(ResponseLabels::fail(res)); self.update(|metrics| { metrics.response_total(&labels).incr(); - *metrics.response_latency(&labels) += fail.since_request_open; + metrics.response_latency(&labels).add(latency::Ms(fail.since_request_open)); }); }, @@ -479,7 +475,8 @@ impl Aggregate { *metrics.tcp().write_bytes_total(&labels) += close.tx_bytes as u64; *metrics.tcp().read_bytes_total(&labels) += close.rx_bytes as u64; - *metrics.tcp().connection_duration(&close_labels) += close.duration; + metrics.tcp().connection_duration(&close_labels) + .add(latency::Ms(close.duration)); metrics.tcp().close_total(&close_labels).incr(); let metrics = metrics.tcp().open_connections.values.get_mut(&labels);