diff --git a/proxy/src/telemetry/metrics/histogram.rs b/proxy/src/telemetry/metrics/histogram.rs index 9e0d65975..55f214c37 100644 --- a/proxy/src/telemetry/metrics/histogram.rs +++ b/proxy/src/telemetry/metrics/histogram.rs @@ -88,6 +88,96 @@ impl> Histogram { self.buckets[idx].incr(); self.sum += value; } + + // ===== Test-only methods to help with assertions about histograms. ===== + + /// Assert the bucket containing `le` has a count of at least `at_least`. + #[cfg(test)] + pub fn assert_bucket_at_least(&self, le: u64, at_least: u64) { + for (&bucket, &count) in self { + if bucket >= le { + let count: u64 = count.into(); + assert!( + count >= at_least, + "le={:?}; bucket={:?};", le, bucket + ); + break; + } + } + } + + /// Assert the bucket containing `le` has a count of exactly `exactly`. + #[cfg(test)] + pub fn assert_bucket_exactly(&self, le: u64, exactly: u64) -> &Self { + for (&bucket, &count) in self { + if bucket >= le { + let count: u64 = count.into(); + assert_eq!( + count, exactly, + "le={:?}; bucket={:?}; buckets={:#?};", + le, bucket, self.buckets, + ); + break; + } + } + self + } + + /// Assert all buckets less than the one containing `value` have + /// counts of exactly `exactly`. + #[cfg(test)] + pub fn assert_lt_exactly(&self, value: u64, exactly: u64) -> &Self { + for (i, &bucket) in self.bounds.0.iter().enumerate() { + let ceiling = match bucket { + Bucket::Le(c) => c, + Bucket::Inf => break, + }; + let next = self.bounds.0.get(i + 1) + .expect("Bucket::Le may not be the last in `bounds`!"); + + if value <= ceiling || next >= &value { + break; + } + + let count: u64 = self.buckets[i].into(); + assert_eq!( + count, exactly, + "bucket={:?}; value={:?};", + bucket, value, + ); + } + self + } + + /// Assert all buckets greater than the one containing `value` have + /// counts of exactly `exactly`. + #[cfg(test)] + pub fn assert_gt_exactly(&self, value: u64, exactly: u64) -> &Self { + // We set this to true after we've iterated past the first bucket + // whose upper bound is >= `value`. + let mut past_le = false; + for (&bucket, &count) in self { + if bucket < value { + continue; + } + + if bucket >= value && !past_le { + past_le = true; + continue; + } + + if past_le { + let count: u64 = count.into(); + assert_eq!( + count, exactly, + "bucket={:?}; value={:?};", + bucket, value, + ); + } + } + self + } + } impl<'a, V: Into> IntoIterator for &'a Histogram { @@ -175,6 +265,28 @@ impl cmp::PartialOrd for Bucket { } } +impl cmp::PartialEq for Bucket { + fn eq(&self, rhs: &u64) -> bool { + if let Bucket::Le(ref ceiling) = *self { + ceiling == rhs + } else { + // `self` is `Bucket::Inf`. + false + } + } +} + +impl cmp::PartialOrd for Bucket { + fn partial_cmp(&self, rhs: &u64) -> Option { + if let Bucket::Le(ref ceiling) = *self { + ceiling.partial_cmp(rhs) + } else { + // `self` is `Bucket::Inf`. + Some(cmp::Ordering::Greater) + } + } +} + impl cmp::Ord for Bucket { fn cmp(&self, rhs: &Bucket) -> cmp::Ordering { match (*self, *rhs) { @@ -193,7 +305,6 @@ mod tests { use std::u64; use std::collections::HashMap; - const NUM_BUCKETS: usize = 47; static BOUNDS: &'static Bounds = &Bounds(&[ Bucket::Le(10), Bucket::Le(20), @@ -248,17 +359,14 @@ mod tests { fn bucket_incremented(obs: u64) -> bool { let mut hist = Histogram::::new(&BOUNDS); hist.add(obs); - let incremented_bucket = &BOUNDS.0.iter() - .position(|bucket| match *bucket { - Bucket::Le(ceiling) => obs <= ceiling, - Bucket::Inf => true, - }) - .unwrap(); - for i in 0..NUM_BUCKETS { - let expected = if i == *incremented_bucket { 1 } else { 0 }; - let count: u64 = hist.buckets[i].into(); - assert_eq!(count, expected, "(for bucket <= {})", BOUNDS.0[i]); - } + // The bucket containing `obs` must have count 1. + hist.assert_bucket_exactly(obs, 1) + // All buckets less than the one containing `obs` must have + // counts of exactly 0. + .assert_lt_exactly(obs, 0) + // All buckets greater than the one containing `obs` must + // have counts of exactly 0. + .assert_gt_exactly(obs, 0); true } diff --git a/proxy/src/telemetry/metrics/http.rs b/proxy/src/telemetry/metrics/http.rs index 41b51d415..8ced24abd 100644 --- a/proxy/src/telemetry/metrics/http.rs +++ b/proxy/src/telemetry/metrics/http.rs @@ -54,6 +54,11 @@ impl RequestMetrics { pub fn end(&mut self) { self.total.incr(); } + + #[cfg(test)] + pub(super) fn total(&self) -> u64 { + self.total.into() + } } // ===== impl ResponseScopes ===== @@ -91,4 +96,14 @@ impl ResponseMetrics { self.total.incr(); self.latency.add(duration); } + + #[cfg(test)] + pub(super) fn total(&self) -> u64 { + self.total.into() + } + + #[cfg(test)] + pub(super) fn latency(&self) -> &Histogram { + &self.latency + } } diff --git a/proxy/src/telemetry/metrics/record.rs b/proxy/src/telemetry/metrics/record.rs index 3ea855012..bd844526a 100644 --- a/proxy/src/telemetry/metrics/record.rs +++ b/proxy/src/telemetry/metrics/record.rs @@ -82,3 +82,219 @@ impl Record { }; } } + +#[cfg(test)] +mod test { + use telemetry::{ + event, + metrics::{self, labels}, + Event, + }; + use ctx::{self, test_util::* }; + use std::time::Duration; + + #[test] + fn record_response_end() { + let process = process(); + let proxy = ctx::Proxy::outbound(&process); + let server = server(&proxy); + + let client = client(&proxy, vec![ + ("service", "draymond"), + ("deployment", "durant"), + ("pod", "klay"), + ]); + + let (_, rsp) = request("http://buoyant.io", &server, &client, 1); + + let end = event::StreamResponseEnd { + grpc_status: None, + since_request_open: Duration::from_millis(300), + since_response_open: Duration::from_millis(0), + bytes_sent: 0, + frames_sent: 0, + }; + + let (mut r, _) = metrics::new(&process, Duration::from_secs(100)); + let ev = Event::StreamResponseEnd(rsp.clone(), end.clone()); + let labels = labels::ResponseLabels::new(&rsp, None); + + assert!(r.metrics.lock() + .expect("lock") + .responses.scopes + .get(&labels) + .is_none() + ); + + r.record_event(&ev); + { + let lock = r.metrics.lock() + .expect("lock"); + let scope = lock.responses.scopes + .get(&labels) + .expect("scope should be some after event"); + + assert_eq!(scope.total(), 1); + + scope.latency().assert_bucket_exactly(300, 1); + scope.latency().assert_lt_exactly(300, 0); + scope.latency().assert_gt_exactly(300, 0); + } + + } + + #[test] + fn record_one_conn_request() { + use self::Event::*; + use self::labels::*; + use std::sync::Arc; + + let process = process(); + let proxy = ctx::Proxy::outbound(&process); + let server = server(&proxy); + + let client = client(&proxy, vec![ + ("service", "draymond"), + ("deployment", "durant"), + ("pod", "klay"), + ]); + + let (req, rsp) = request("http://buoyant.io", &server, &client, 1); + let server_transport = + Arc::new(ctx::transport::Ctx::Server(server.clone())); + let client_transport = + Arc::new(ctx::transport::Ctx::Client(client.clone())); + let transport_close = event::TransportClose { + clean: true, + duration: Duration::from_secs(30_000), + rx_bytes: 4321, + tx_bytes: 4321, + }; + + let events = vec![ + TransportOpen(server_transport.clone()), + TransportOpen(client_transport.clone()), + StreamRequestOpen(req.clone()), + StreamRequestEnd(req.clone(), event::StreamRequestEnd { + since_request_open: Duration::from_millis(10), + }), + + StreamResponseOpen(rsp.clone(), event::StreamResponseOpen { + since_request_open: Duration::from_millis(300), + }), + StreamResponseEnd(rsp.clone(), event::StreamResponseEnd { + grpc_status: None, + since_request_open: Duration::from_millis(300), + since_response_open: Duration::from_millis(0), + bytes_sent: 0, + frames_sent: 0, + }), + TransportClose( + server_transport.clone(), + transport_close.clone(), + ), + TransportClose( + client_transport.clone(), + transport_close.clone(), + ), + ]; + + let (mut r, _) = metrics::new(&process, Duration::from_secs(1000)); + + let req_labels = RequestLabels::new(&req); + let rsp_labels = ResponseLabels::new(&rsp, None); + let srv_open_labels = TransportLabels::new(&server_transport); + let srv_close_labels = TransportCloseLabels::new( + &ctx::transport::Ctx::Server(server.clone()), + &transport_close, + ); + let client_open_labels = TransportLabels::new(&client_transport); + let client_close_labels = TransportCloseLabels::new( + &ctx::transport::Ctx::Client(client.clone()), + &transport_close, + ); + + { + let lock = r.metrics.lock() + .expect("lock"); + assert!(lock.requests.scopes.get(&req_labels).is_none()); + assert!(lock.responses.scopes.get(&rsp_labels).is_none()); + assert!(lock.transports.scopes.get(&srv_open_labels).is_none()); + assert!(lock.transports.scopes.get(&client_open_labels).is_none()); + assert!(lock.transport_closes.scopes.get(&srv_close_labels).is_none()); + assert!(lock.transport_closes.scopes.get(&client_close_labels).is_none()); + } + + for e in &events { + r.record_event(e); + } + + { + let lock = r.metrics.lock() + .expect("lock"); + + // === request scope ==================================== + assert_eq!( + lock.requests.scopes + .get(&req_labels) + .map(|scope| scope.total()), + Some(1) + ); + + // === response scope =================================== + let response_scope = lock + .responses.scopes + .get(&rsp_labels) + .expect("response scope missing"); + assert_eq!(response_scope.total(), 1); + + response_scope.latency() + .assert_bucket_exactly(300, 1) + .assert_gt_exactly(300, 0) + .assert_lt_exactly(300, 0); + + // === server transport open scope ====================== + let srv_transport_scope = lock + .transports.scopes + .get(&srv_open_labels) + .expect("server transport scope missing"); + assert_eq!(srv_transport_scope.open_total(), 1); + assert_eq!(srv_transport_scope.write_bytes_total(), 4321); + assert_eq!(srv_transport_scope.read_bytes_total(), 4321); + + // === client transport open scope ====================== + let client_transport_scope = lock + .transports.scopes + .get(&client_open_labels) + .expect("client transport scope missing"); + assert_eq!(client_transport_scope.open_total(), 1); + assert_eq!(client_transport_scope.write_bytes_total(), 4321); + assert_eq!(client_transport_scope.read_bytes_total(), 4321); + + let transport_duration: u64 = 30_000 * 1_000; + + // === server transport close scope ===================== + let srv_transport_close_scope = lock + .transport_closes.scopes + .get(&srv_close_labels) + .expect("server transport close scope missing"); + assert_eq!(srv_transport_close_scope.close_total(), 1); + srv_transport_close_scope.connection_duration() + .assert_bucket_exactly(transport_duration, 1) + .assert_gt_exactly(transport_duration, 0) + .assert_lt_exactly(transport_duration, 0); + + // === client transport close scope ===================== + let client_transport_close_scope = lock + .transport_closes.scopes + .get(&client_close_labels) + .expect("client transport close scope missing"); + assert_eq!(client_transport_close_scope.close_total(), 1); + client_transport_close_scope.connection_duration() + .assert_bucket_exactly(transport_duration, 1) + .assert_gt_exactly(transport_duration, 0) + .assert_lt_exactly(transport_duration, 0); + } + } + +} diff --git a/proxy/src/telemetry/metrics/transport.rs b/proxy/src/telemetry/metrics/transport.rs index 041966987..228dcf1b2 100644 --- a/proxy/src/telemetry/metrics/transport.rs +++ b/proxy/src/telemetry/metrics/transport.rs @@ -77,6 +77,21 @@ impl OpenMetrics { self.read_bytes_total += rx; self.write_bytes_total += tx; } + + #[cfg(test)] + pub(super) fn open_total(&self) -> u64 { + self.open_total.into() + } + + #[cfg(test)] + pub(super) fn read_bytes_total(&self) -> u64 { + self.read_bytes_total.into() + } + + #[cfg(test)] + pub(super) fn write_bytes_total(&self) -> u64 { + self.write_bytes_total.into() + } } // ===== impl CloseScopes ===== @@ -111,4 +126,14 @@ impl CloseMetrics { self.close_total.incr(); self.connection_duration.add(duration); } + + #[cfg(test)] + pub(super) fn close_total(&self) -> u64 { + self.close_total.into() + } + + #[cfg(test)] + pub(super) fn connection_duration(&self) -> &Histogram { + &self.connection_duration + } }