From 810f6bb7196ca505d713b690e8dbfe44c86d9d30 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Mon, 30 Apr 2018 16:11:12 -0700 Subject: [PATCH] proxy: Expire metrics that have not been updated for 10 minutes (#880) The proxy is now configured with the CONDUIT_PROXY_METRICS_RETAIN_IDLE environment variable that dictates the amount of time that the proxy will retain metrics that have not been updated. A timestamp is maintained for each unique set of labels, indicating the last time that the scope was updated. Then, when metrics are read, all metrics older than CONDUIT_PROXY_METRICS_RETAIN_IDLE are dropped from the stats registry. A ctx::test_utils module has been added to aid testing. Fixes #819 --- proxy/src/config.rs | 10 ++ proxy/src/ctx/mod.rs | 58 ++++++++++ proxy/src/lib.rs | 1 + proxy/src/telemetry/control.rs | 7 +- proxy/src/telemetry/metrics/http.rs | 7 +- proxy/src/telemetry/metrics/labels.rs | 2 +- proxy/src/telemetry/metrics/mod.rs | 140 +++++++++++++++++++++-- proxy/src/telemetry/metrics/serve.rs | 10 +- proxy/src/telemetry/metrics/transport.rs | 7 +- proxy/src/telemetry/mod.rs | 4 +- 10 files changed, 227 insertions(+), 19 deletions(-) diff --git a/proxy/src/config.rs b/proxy/src/config.rs index 62acc372e..b1aea398d 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -53,6 +53,9 @@ pub struct Config { /// Event queue capacity. pub event_buffer_capacity: usize, + /// Age after which metrics may be dropped. + pub metrics_retain_idle: Duration, + /// Timeout after which to cancel binding a request. pub bind_timeout: Duration, @@ -128,6 +131,7 @@ pub const ENV_PRIVATE_FORWARD: &str = "CONDUIT_PROXY_PRIVATE_FORWARD"; pub const ENV_PUBLIC_LISTENER: &str = "CONDUIT_PROXY_PUBLIC_LISTENER"; pub const ENV_CONTROL_LISTENER: &str = "CONDUIT_PROXY_CONTROL_LISTENER"; pub const ENV_METRICS_LISTENER: &str = "CONDUIT_PROXY_METRICS_LISTENER"; +pub const ENV_METRICS_RETAIN_IDLE: &str = "CONDUIT_PROXY_METRICS_RETAIN_IDLE"; const ENV_PRIVATE_CONNECT_TIMEOUT: &str = "CONDUIT_PROXY_PRIVATE_CONNECT_TIMEOUT"; const ENV_PUBLIC_CONNECT_TIMEOUT: &str = "CONDUIT_PROXY_PUBLIC_CONNECT_TIMEOUT"; pub const ENV_BIND_TIMEOUT: &str = "CONDUIT_PROXY_BIND_TIMEOUT"; @@ -148,6 +152,7 @@ const DEFAULT_PRIVATE_LISTENER: &str = "tcp://127.0.0.1:4140"; const DEFAULT_PUBLIC_LISTENER: &str = "tcp://0.0.0.0:4143"; const DEFAULT_CONTROL_LISTENER: &str = "tcp://0.0.0.0:4190"; const DEFAULT_METRICS_LISTENER: &str = "tcp://127.0.0.1:4191"; +const DEFAULT_METRICS_RETAIN_IDLE: u64 = 10 * 60; // Ten minutes const DEFAULT_PRIVATE_CONNECT_TIMEOUT_MS: u64 = 20; const DEFAULT_PUBLIC_CONNECT_TIMEOUT_MS: u64 = 300; const DEFAULT_BIND_TIMEOUT_MS: u64 = 10_000; // ten seconds, as in Linkerd. @@ -182,6 +187,7 @@ impl<'a> TryFrom<&'a Strings> for Config { let bind_timeout = parse(strings, ENV_BIND_TIMEOUT, parse_number); let resolv_conf_path = strings.get(ENV_RESOLV_CONF); let event_buffer_capacity = parse(strings, ENV_EVENT_BUFFER_CAPACITY, parse_number); + let metrics_retain_idle = parse(strings, ENV_METRICS_RETAIN_IDLE, parse_number); let pod_namespace = strings.get(ENV_POD_NAMESPACE).and_then(|maybe_value| { // There cannot be a default pod namespace, and the pod namespace is required. maybe_value.ok_or_else(|| { @@ -236,6 +242,10 @@ impl<'a> TryFrom<&'a Strings> for Config { control_host_and_port: control_host_and_port?, event_buffer_capacity: event_buffer_capacity?.unwrap_or(DEFAULT_EVENT_BUFFER_CAPACITY), + metrics_retain_idle: Duration::from_millis( + metrics_retain_idle?.unwrap_or(DEFAULT_METRICS_RETAIN_IDLE) + ), + bind_timeout: Duration::from_millis(bind_timeout?.unwrap_or(DEFAULT_BIND_TIMEOUT_MS)), pod_namespace: pod_namespace?, diff --git a/proxy/src/ctx/mod.rs b/proxy/src/ctx/mod.rs index 49a8020a8..5da01600c 100644 --- a/proxy/src/ctx/mod.rs +++ b/proxy/src/ctx/mod.rs @@ -74,3 +74,61 @@ impl Proxy { !self.is_inbound() } } + +#[cfg(test)] +pub mod test_util { + use http; + use futures_watch; + use std::{ + fmt, + net::SocketAddr, + sync::Arc, + time::SystemTime, + }; + + use ctx; + use telemetry::metrics::DstLabels; + + fn addr() -> SocketAddr { + ([1, 2, 3, 4], 5678).into() + } + + pub fn process() -> Arc { + Arc::new(ctx::Process { + scheduled_namespace: "test".into(), + start_time: SystemTime::now(), + }) + } + + pub fn server(proxy: &Arc) -> Arc { + ctx::transport::Server::new(&proxy, &addr(), &addr(), &Some(addr())) + } + + pub fn client(proxy: &Arc, labels: L) -> Arc + where + L: IntoIterator, + S: fmt::Display, + { + let (labels_watch, _store) = futures_watch::Watch::new(DstLabels::new(labels)); + ctx::transport::Client::new(&proxy, &addr(), Some(labels_watch)) + } + + pub fn request( + uri: &str, + server: &Arc, + client: &Arc, + id: usize + ) -> (Arc, Arc) { + let req = ctx::http::Request::new( + &http::Request::get(uri).body(()).unwrap(), + &server, + &client, + id, + ); + let rsp = ctx::http::Response::new( + &http::Response::builder().status(http::StatusCode::OK).body(()).unwrap(), + &req, + ); + (req, rsp) + } +} diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 69c328cda..39388bfe2 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -200,6 +200,7 @@ where let (sensors, telemetry) = telemetry::new( &process_ctx, config.event_buffer_capacity, + config.metrics_retain_idle, ); let dns_config = dns::Config::from_system_config() diff --git a/proxy/src/telemetry/control.rs b/proxy/src/telemetry/control.rs index 97f125b11..344520c79 100644 --- a/proxy/src/telemetry/control.rs +++ b/proxy/src/telemetry/control.rs @@ -1,5 +1,6 @@ use std::io; use std::sync::{Arc, Mutex}; +use std::time::Duration; use futures::{future, Async, Future, Poll, Stream}; use futures_mpsc_lossy::Receiver; @@ -18,6 +19,8 @@ pub struct MakeControl { rx: Receiver, process_ctx: Arc, + + metrics_retain_idle: Duration, } /// Handles the receipt of events. @@ -59,10 +62,12 @@ impl MakeControl { pub(super) fn new( rx: Receiver, process_ctx: &Arc, + metrics_retain_idle: Duration, ) -> Self { Self { rx, process_ctx: Arc::clone(process_ctx), + metrics_retain_idle, } } @@ -77,7 +82,7 @@ impl MakeControl { /// - `Err(io::Error)` if the timeout could not be created. pub fn make_control(self, taps: &Arc>, handle: &Handle) -> io::Result { let (metrics_record, metrics_service) = - metrics::new(&self.process_ctx); + metrics::new(&self.process_ctx, self.metrics_retain_idle); Ok(Control { metrics_record, diff --git a/proxy/src/telemetry/metrics/http.rs b/proxy/src/telemetry/metrics/http.rs index 7b6859d26..41b51d415 100644 --- a/proxy/src/telemetry/metrics/http.rs +++ b/proxy/src/telemetry/metrics/http.rs @@ -8,17 +8,18 @@ use super::{ Metric, RequestLabels, ResponseLabels, - Scopes + Scopes, + Stamped, }; -pub(super) type RequestScopes = Scopes; +pub(super) type RequestScopes = Scopes>; #[derive(Debug, Default)] pub(super) struct RequestMetrics { total: Counter, } -pub(super) type ResponseScopes = Scopes; +pub(super) type ResponseScopes = Scopes>; #[derive(Debug, Default)] pub struct ResponseMetrics { diff --git a/proxy/src/telemetry/metrics/labels.rs b/proxy/src/telemetry/metrics/labels.rs index ac16e2f67..ad5f12641 100644 --- a/proxy/src/telemetry/metrics/labels.rs +++ b/proxy/src/telemetry/metrics/labels.rs @@ -81,7 +81,7 @@ pub struct DstLabels { // ===== impl RequestLabels ===== -impl<'a> RequestLabels { +impl RequestLabels { pub fn new(req: &ctx::http::Request) -> Self { let direction = Direction::from_context(req.server.proxy.as_ref()); diff --git a/proxy/src/telemetry/metrics/mod.rs b/proxy/src/telemetry/metrics/mod.rs index e8074e842..20ef2026e 100644 --- a/proxy/src/telemetry/metrics/mod.rs +++ b/proxy/src/telemetry/metrics/mod.rs @@ -31,7 +31,7 @@ use std::fmt::{self, Display}; use std::hash::Hash; use std::marker::PhantomData; use std::sync::{Arc, Mutex}; -use std::time::UNIX_EPOCH; +use std::time::{UNIX_EPOCH, Duration, Instant}; use indexmap::IndexMap; @@ -120,15 +120,21 @@ struct Scopes { scopes: IndexMap, } +#[derive(Debug)] +struct Stamped { + stamp: Instant, + inner: T, +} + /// Construct the Prometheus metrics. /// /// Returns the `Record` and `Serve` sides. The `Serve` side /// is a Hyper service which can be used to create the server for the /// scrape endpoint, while the `Record` side can receive updates to the /// metrics by calling `record_event`. -pub fn new(process: &Arc) -> (Record, Serve){ +pub fn new(process: &Arc, idle_retain: Duration) -> (Record, Serve){ let metrics = Arc::new(Mutex::new(Root::new(process))); - (Record::new(&metrics), Serve::new(&metrics)) + (Record::new(&metrics), Serve::new(&metrics, idle_retain)) } // ===== impl Metric ===== @@ -184,22 +190,33 @@ impl Root { fn request(&mut self, labels: RequestLabels) -> &mut http::RequestMetrics { self.requests.scopes.entry(labels) - .or_insert_with(http::RequestMetrics::default) + .or_insert_with(|| http::RequestMetrics::default().into()) + .stamped() } fn response(&mut self, labels: ResponseLabels) -> &mut http::ResponseMetrics { self.responses.scopes.entry(labels) - .or_insert_with(http::ResponseMetrics::default) + .or_insert_with(|| http::ResponseMetrics::default().into()) + .stamped() } fn transport(&mut self, labels: TransportLabels) -> &mut transport::OpenMetrics { self.transports.scopes.entry(labels) - .or_insert_with(transport::OpenMetrics::default) + .or_insert_with(|| transport::OpenMetrics::default().into()) + .stamped() } fn transport_close(&mut self, labels: TransportCloseLabels) -> &mut transport::CloseMetrics { self.transport_closes.scopes.entry(labels) - .or_insert_with(transport::CloseMetrics::default) + .or_insert_with(|| transport::CloseMetrics::default().into()) + .stamped() + } + + fn retain_since(&mut self, epoch: Instant) { + self.requests.retain_since(epoch); + self.responses.retain_since(epoch); + self.transports.retain_since(epoch); + self.transport_closes.retain_since(epoch); } } @@ -217,10 +234,117 @@ impl fmt::Display for Root { } } +// ===== impl Stamped ===== + +impl Stamped { + fn stamped(&mut self) -> &mut T { + self.stamp = Instant::now(); + &mut self.inner + } +} + +impl From for Stamped { + fn from(inner: T) -> Self { + Self { + inner, + stamp: Instant::now(), + } + } +} + +impl ::std::ops::Deref for Stamped { + type Target = T; + fn deref(&self) -> &Self::Target { + &self.inner + } +} + // ===== impl Scopes ===== -impl Default for Scopes { +impl Default for Scopes { fn default() -> Self { Scopes { scopes: IndexMap::default(), } } } + +impl Scopes> { + fn retain_since(&mut self, epoch: Instant) { + self.scopes.retain(|_, v| v.stamp >= epoch); + } +} + +#[cfg(test)] +mod tests { + use ctx::test_util::*; + use telemetry::event; + use super::*; + + fn mock_route( + root: &mut Root, + proxy: &Arc, + server: &Arc, + team: &str + ) { + let client = client(&proxy, vec![("team", team)]); + let (req, rsp) = request("http://nba.com", &server, &client, 1); + + let client_transport = Arc::new(ctx::transport::Ctx::Client(client)); + let transport = TransportLabels::new(&client_transport); + root.transport(transport.clone()).open(); + + root.request(RequestLabels::new(&req)).end(); + root.response(ResponseLabels::new(&rsp, None)).end(Duration::from_millis(10)); + root.transport(transport).close(100, 200); + + let end = TransportCloseLabels::new(&client_transport, &event::TransportClose { + clean: true, + duration: Duration::from_millis(15), + rx_bytes: 40, + tx_bytes: 0, + }); + root.transport_close(end).close(Duration::from_millis(15)); + } + + #[test] + fn expiry() { + let process = process(); + let proxy = ctx::Proxy::outbound(&process); + + let server = server(&proxy); + let server_transport = Arc::new(ctx::transport::Ctx::Server(server.clone())); + + let mut root = Root::default(); + + let t0 = Instant::now(); + root.transport(TransportLabels::new(&server_transport)).open(); + + mock_route(&mut root, &proxy, &server, "warriors"); + let t1 = Instant::now(); + + mock_route(&mut root, &proxy, &server, "sixers"); + let t2 = Instant::now(); + + assert_eq!(root.requests.scopes.len(), 2); + assert_eq!(root.responses.scopes.len(), 2); + assert_eq!(root.transports.scopes.len(), 2); + assert_eq!(root.transport_closes.scopes.len(), 1); + + root.retain_since(t0); + assert_eq!(root.requests.scopes.len(), 2); + assert_eq!(root.responses.scopes.len(), 2); + assert_eq!(root.transports.scopes.len(), 2); + assert_eq!(root.transport_closes.scopes.len(), 1); + + root.retain_since(t1); + assert_eq!(root.requests.scopes.len(), 1); + assert_eq!(root.responses.scopes.len(), 1); + assert_eq!(root.transports.scopes.len(), 1); + assert_eq!(root.transport_closes.scopes.len(), 1); + + root.retain_since(t2); + assert_eq!(root.requests.scopes.len(), 0); + assert_eq!(root.responses.scopes.len(), 0); + assert_eq!(root.transports.scopes.len(), 0); + assert_eq!(root.transport_closes.scopes.len(), 0); + } +} diff --git a/proxy/src/telemetry/metrics/serve.rs b/proxy/src/telemetry/metrics/serve.rs index af940e47c..e52cae71b 100644 --- a/proxy/src/telemetry/metrics/serve.rs +++ b/proxy/src/telemetry/metrics/serve.rs @@ -6,6 +6,7 @@ use hyper::header::{AcceptEncoding, ContentEncoding, ContentType, Encoding, Qual use hyper::server::{Request, Response, Service}; use std::io::Write; use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; use super::Root; @@ -13,14 +14,16 @@ use super::Root; #[derive(Debug, Clone)] pub struct Serve { metrics: Arc>, + idle_retain: Duration, } // ===== impl Serve ===== impl Serve { - pub(super) fn new(metrics: &Arc>) -> Self { + pub(super) fn new(metrics: &Arc>, idle_retain: Duration) -> Self { Serve { metrics: metrics.clone(), + idle_retain, } } @@ -49,9 +52,12 @@ impl Service for Serve { .with_status(StatusCode::NotFound)); } - let metrics = self.metrics.lock() + let mut metrics = self.metrics.lock() .expect("metrics lock poisoned"); + metrics.retain_since(Instant::now() - self.idle_retain); + let metrics = metrics; + let resp = if Self::is_gzip(&req) { trace!("gzipping metrics"); let mut writer = GzEncoder::new(Vec::::new(), CompressionOptions::fast()); diff --git a/proxy/src/telemetry/metrics/transport.rs b/proxy/src/telemetry/metrics/transport.rs index f8dfb9cd2..041966987 100644 --- a/proxy/src/telemetry/metrics/transport.rs +++ b/proxy/src/telemetry/metrics/transport.rs @@ -9,10 +9,11 @@ use super::{ Metric, TransportLabels, TransportCloseLabels, - Scopes + Scopes, + Stamped, }; -pub(super) type OpenScopes = Scopes; +pub(super) type OpenScopes = Scopes>; #[derive(Debug, Default)] pub(super) struct OpenMetrics { @@ -22,7 +23,7 @@ pub(super) struct OpenMetrics { read_bytes_total: Counter, } -pub(super) type CloseScopes = Scopes; +pub(super) type CloseScopes = Scopes>; #[derive(Debug, Default)] pub(super) struct CloseMetrics { diff --git a/proxy/src/telemetry/mod.rs b/proxy/src/telemetry/mod.rs index 5056e7ffd..99a408335 100644 --- a/proxy/src/telemetry/mod.rs +++ b/proxy/src/telemetry/mod.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::time::Duration; use futures_mpsc_lossy; @@ -30,9 +31,10 @@ pub use self::sensor::Sensors; pub fn new( process: &Arc, capacity: usize, + metrics_retain_idle: Duration, ) -> (Sensors, MakeControl) { let (tx, rx) = futures_mpsc_lossy::channel(capacity); let s = Sensors::new(tx); - let c = MakeControl::new(rx, process); + let c = MakeControl::new(rx, process, metrics_retain_idle); (s, c) }