From abc75a506dc3aa91465a863d7e99446efb92cce9 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Wed, 8 Aug 2018 14:21:19 -0700 Subject: [PATCH] Share a common tls_config_reload metrics object (#46) The current implementation of TLS config reload telemetry has several layers: a sensor emits events into a dispatcher that updates metrics. This can be simplified by sharing a common metrics object between the metrics registry and config module. The metrics registry is updated to hold a read-only `tls_config_reload::Fmt`; and the config module holds a `tls_config_reload::Sensor`. The `Sensor` type holds a strong reference to an inner metrics structure and acquires a lock on updates. The `Fmt` type holds a weak reference to the metrics structure so that the metrics server can print the state as long as it's actually held in memory. If the `Sensor` is dropped (for instance, because TLS has been administratively disabled), then no metrics will be formatted by `Fmt`. --- benches/record.rs | 6 +- src/lib.rs | 4 +- src/telemetry/event.rs | 5 +- src/telemetry/metrics/mod.rs | 18 ++- src/telemetry/metrics/record.rs | 10 +- src/telemetry/metrics/tls_config_reload.rs | 141 ++++++++++++++------- src/telemetry/mod.rs | 7 +- src/telemetry/sensor/mod.rs | 22 +--- src/transport/tls/config.rs | 6 +- 9 files changed, 116 insertions(+), 103 deletions(-) diff --git a/benches/record.rs b/benches/record.rs index c0a759b01..ab9eeba3a 100644 --- a/benches/record.rs +++ b/benches/record.rs @@ -102,7 +102,7 @@ fn record_response_end(b: &mut Bencher) { frames_sent: 0, }; - let (mut r, _) = metrics::new(&process, Duration::from_secs(1000)); + let (mut r, _) = metrics::new(&process, Duration::from_secs(1000), Default::default()); b.iter(|| r.record_event(&Event::StreamResponseEnd(rsp.clone(), end.clone()))); } @@ -169,7 +169,7 @@ fn record_one_conn_request(b: &mut Bencher) { }), ]; - let (mut r, _) = metrics::new(&process, Duration::from_secs(1000)); + let (mut r, _) = metrics::new(&process, Duration::from_secs(1000), Default::default()); b.iter(|| for e in &events { r.record_event(e); }); } @@ -239,6 +239,6 @@ fn record_many_dsts(b: &mut Bencher) { tx_bytes: 4321, })); - let (mut r, _) = metrics::new(&process, Duration::from_secs(1000)); + let (mut r, _) = metrics::new(&process, Duration::from_secs(1000), Default::default()); b.iter(|| for e in &events { r.record_event(e); }); } diff --git a/src/lib.rs b/src/lib.rs index a0abb7e2f..0546c3121 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -247,14 +247,14 @@ where ); let (taps, observe) = control::Observe::new(100); - let (sensors, metrics_server) = telemetry::new( + let (sensors, tls_config_sensor, metrics_server) = telemetry::new( &process_ctx, config.metrics_retain_idle, &taps, ); let tls_client_config = tls_config_watch.client.clone(); - let tls_cfg_bg = tls_config_watch.start(sensors.tls_config()); + let tls_cfg_bg = tls_config_watch.start(tls_config_sensor); let controller_tls = config.tls_settings.as_ref().and_then(|settings| { settings.controller_identity.as_ref().map(|controller_identity| { diff --git a/src/telemetry/event.rs b/src/telemetry/event.rs index 08465f296..65af9d36a 100644 --- a/src/telemetry/event.rs +++ b/src/telemetry/event.rs @@ -1,5 +1,5 @@ use std::sync::Arc; -use std::time::{Duration, Instant, SystemTime}; +use std::time::{Duration, Instant}; use h2; @@ -17,9 +17,6 @@ pub enum Event { StreamResponseOpen(Arc, StreamResponseOpen), StreamResponseFail(Arc, StreamResponseFail), StreamResponseEnd(Arc, StreamResponseEnd), - - TlsConfigReloaded(SystemTime), - TlsConfigReloadFailed(::transport::tls::ConfigError), } #[derive(Clone, Debug)] diff --git a/src/telemetry/metrics/mod.rs b/src/telemetry/metrics/mod.rs index 7013dc10e..1df8c8d94 100644 --- a/src/telemetry/metrics/mod.rs +++ b/src/telemetry/metrics/mod.rs @@ -59,7 +59,7 @@ mod latency; mod process; mod record; mod serve; -mod tls_config_reload; +pub mod tls_config_reload; mod transport; use self::counter::Counter; @@ -74,7 +74,6 @@ use self::labels::{ pub use self::labels::DstLabels; pub use self::record::Record; pub use self::serve::Serve; -pub use self::tls_config_reload::TlsConfigReload; /// Writes a metric in prometheus-formatted output. /// @@ -111,7 +110,7 @@ struct Root { responses: http::ResponseScopes, transports: transport::OpenScopes, transport_closes: transport::CloseScopes, - tls_config_reload: TlsConfigReload, + tls_config_reload: tls_config_reload::Fmt, process: process::Process, } @@ -136,8 +135,10 @@ struct Stamped { /// is a Hyper service which can be used to create the server for the /// scrape endpoint, while the `Record` side can receive updates to the /// metrics by calling `record_event`. -pub fn new(process: &Arc, idle_retain: Duration) -> (Record, Serve){ - let metrics = Arc::new(Mutex::new(Root::new(process))); +pub fn new(process: &Arc, idle_retain: Duration, tls: tls_config_reload::Fmt) + -> (Record, Serve) +{ + let metrics = Arc::new(Mutex::new(Root::new(process, tls))); (Record::new(&metrics), Serve::new(&metrics, idle_retain)) } @@ -174,9 +175,10 @@ impl<'a, M: FmtMetric> Metric<'a, M> { // ===== impl Root ===== impl Root { - pub fn new(process: &Arc) -> Self { + pub fn new(process: &Arc, tls_config_reload: tls_config_reload::Fmt) -> Self { Self { process: process::Process::new(&process), + tls_config_reload, .. Root::default() } } @@ -205,10 +207,6 @@ impl Root { .stamped() } - fn tls_config(&mut self) -> &mut TlsConfigReload { - &mut self.tls_config_reload - } - fn retain_since(&mut self, epoch: Instant) { self.requests.retain_since(epoch); self.responses.retain_since(epoch); diff --git a/src/telemetry/metrics/record.rs b/src/telemetry/metrics/record.rs index 23eaeb736..3d324bc7f 100644 --- a/src/telemetry/metrics/record.rs +++ b/src/telemetry/metrics/record.rs @@ -82,12 +82,6 @@ impl Record { .close(close.duration); }) }, - - Event::TlsConfigReloaded(ref when) => - self.update(|m| m.tls_config().success(when)), - - Event::TlsConfigReloadFailed(ref err) => - self.update(|m| m.tls_config().error(err.clone())), }; } } @@ -135,7 +129,7 @@ mod test { frames_sent: 0, }; - let (mut r, _) = metrics::new(&process, Duration::from_secs(100)); + let (mut r, _) = metrics::new(&process, Duration::from_secs(100), Default::default()); let ev = Event::StreamResponseEnd(rsp.clone(), end.clone()); let labels = labels::ResponseLabels::new(&rsp, None); @@ -230,7 +224,7 @@ mod test { ), ]; - let (mut r, _) = metrics::new(&process, Duration::from_secs(1000)); + let (mut r, _) = metrics::new(&process, Duration::from_secs(1000), Default::default()); let req_labels = RequestLabels::new(&req); let rsp_labels = ResponseLabels::new(&rsp, None); diff --git a/src/telemetry/metrics/tls_config_reload.rs b/src/telemetry/metrics/tls_config_reload.rs index 280166463..c393e3483 100644 --- a/src/telemetry/metrics/tls_config_reload.rs +++ b/src/telemetry/metrics/tls_config_reload.rs @@ -1,6 +1,11 @@ -use std::{fmt, path::PathBuf, time::{SystemTime, UNIX_EPOCH}}; +use std::{ + fmt, + path::PathBuf, + sync::{Arc, Mutex, Weak}, + time::{SystemTime, UNIX_EPOCH}, +}; -use telemetry::metrics::{Counter, Gauge, Metric, Scopes, labels::Errno}; +use telemetry::metrics::{labels::Errno, Counter, Gauge, Metric, Scopes}; use transport::tls; metrics! { @@ -13,8 +18,25 @@ metrics! { } } +/// Constructs a Sensor/Fmt pair for TLS config reload metrics. +pub fn new() -> (Sensor, Fmt) { + let inner = Arc::new(Mutex::new(Inner::default())); + let fmt = Fmt(Arc::downgrade(&inner)); + (Sensor(inner), fmt) +} + +/// Supports recording TLS config reload metrics. +/// +/// When this type is dropped, its metrics may no longer be formatted for prometheus. +#[derive(Debug)] +pub struct Sensor(Arc>); + +/// Formats metrics for Prometheus for a corresonding `Sensor`. #[derive(Debug, Default)] -pub struct TlsConfigReload { +pub struct Fmt(Weak>); + +#[derive(Debug, Default)] +struct Inner { last_reload: Option, by_status: Scopes, } @@ -25,41 +47,61 @@ enum Status { InvalidTrustAnchors, InvalidPrivateKey, InvalidEndEntityCert, - Io { path: PathBuf, errno: Option, }, + Io { path: PathBuf, errno: Option }, } -// ===== impl TlsConfigReload ===== +// ===== impl Sensor ===== -impl TlsConfigReload { - pub fn success(&mut self, when: &SystemTime) { - let t = when +impl Sensor { + pub fn reloaded(&mut self) { + let t = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("times must be after UNIX epoch") .as_secs(); - self.last_reload = Some(t.into()); - self.by_status.scopes - .entry(Status::Reloaded) - .or_insert_with(|| Counter::default()) - .incr() + if let Ok(mut inner) = self.0.lock() { + inner.last_reload = Some(t.into()); + + inner + .by_status + .scopes + .entry(Status::Reloaded) + .or_insert_with(|| Counter::default()) + .incr(); + } } - pub fn error(&mut self, e: tls::ConfigError) { - self.by_status.scopes - .entry(e.into()) - .or_insert_with(|| Counter::default()) - .incr() + pub fn failed(&mut self, e: tls::ConfigError) { + if let Ok(mut inner) = self.0.lock() { + inner + .by_status + .scopes + .entry(e.into()) + .or_insert_with(|| Counter::default()) + .incr(); + } } } -impl fmt::Display for TlsConfigReload { +// ===== impl Fmt ===== + +impl fmt::Display for Fmt { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - if !self.by_status.scopes.is_empty() { + let lock = match self.0.upgrade() { + None => return Ok(()), + Some(lock) => lock, + }; + let inner = match lock.lock() { + Err(_) => return Ok(()), + Ok(inner) => inner, + }; + + if !inner.by_status.scopes.is_empty() { tls_config_reload_total.fmt_help(f)?; - tls_config_reload_total.fmt_scopes(f, &self.by_status, |s| &s)?; + tls_config_reload_total.fmt_scopes(f, &inner.by_status, |s| &s)?; } - if let Some(timestamp) = self.last_reload { + if let Some(timestamp) = inner.last_reload { tls_config_last_reload_seconds.fmt_help(f)?; tls_config_last_reload_seconds.fmt_metric(f, timestamp)?; } @@ -73,14 +115,13 @@ impl fmt::Display for TlsConfigReload { impl From for Status { fn from(err: tls::ConfigError) -> Self { match err { - tls::ConfigError::Io(path, error_code) => - Status::Io { path, errno: error_code.map(Errno::from) }, - tls::ConfigError::FailedToParseTrustAnchors(_) => - Status::InvalidTrustAnchors, - tls::ConfigError::EndEntityCertIsNotValid(_) => - Status::InvalidEndEntityCert, - tls::ConfigError::InvalidPrivateKey => - Status::InvalidPrivateKey, + tls::ConfigError::Io(path, error_code) => Status::Io { + path, + errno: error_code.map(Errno::from), + }, + tls::ConfigError::FailedToParseTrustAnchors(_) => Status::InvalidTrustAnchors, + tls::ConfigError::EndEntityCertIsNotValid(_) => Status::InvalidEndEntityCert, + tls::ConfigError::InvalidPrivateKey => Status::InvalidPrivateKey, } } } @@ -88,25 +129,27 @@ impl From for Status { impl fmt::Display for Status { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { - Status::Reloaded => - f.pad("status=\"reloaded\""), - Status::Io { ref path, errno: Some(errno) } => - write!(f, - "status=\"io_error\",path=\"{}\",errno=\"{}\"", - path.display(), errno - ), - Status::Io { ref path, errno: None } => - write!(f, - "status=\"io_error\",path=\"{}\",errno=\"UNKNOWN\"", - path.display(), - ), - Status::InvalidPrivateKey => - f.pad("status=\"invalid_private_key\""), - Status::InvalidEndEntityCert => - f.pad("status=\"invalid_end_entity_cert\""), - Status::InvalidTrustAnchors => - f.pad("status=\"invalid_trust_anchors\""), + Status::Reloaded => f.pad("status=\"reloaded\""), + Status::Io { + ref path, + errno: Some(errno), + } => write!( + f, + "status=\"io_error\",path=\"{}\",errno=\"{}\"", + path.display(), + errno + ), + Status::Io { + ref path, + errno: None, + } => write!( + f, + "status=\"io_error\",path=\"{}\",errno=\"UNKNOWN\"", + path.display(), + ), + Status::InvalidPrivateKey => f.pad("status=\"invalid_private_key\""), + Status::InvalidEndEntityCert => f.pad("status=\"invalid_end_entity_cert\""), + Status::InvalidTrustAnchors => f.pad("status=\"invalid_trust_anchors\""), } } } - diff --git a/src/telemetry/mod.rs b/src/telemetry/mod.rs index a6b7d6b94..8f36b27d7 100644 --- a/src/telemetry/mod.rs +++ b/src/telemetry/mod.rs @@ -15,8 +15,9 @@ pub fn new( process: &Arc, metrics_retain_idle: Duration, taps: &Arc>, -) -> (Sensors, metrics::Serve) { - let (metrics_record, metrics_serve) = metrics::new(process, metrics_retain_idle); +) -> (Sensors, metrics::tls_config_reload::Sensor, metrics::Serve) { + let (tls_config_sensor, tls_config_fmt) = metrics::tls_config_reload::new(); + let (metrics_record, metrics_serve) = metrics::new(process, metrics_retain_idle, tls_config_fmt); let s = Sensors::new(metrics_record, taps); - (s, metrics_serve) + (s, tls_config_sensor, metrics_serve) } diff --git a/src/telemetry/sensor/mod.rs b/src/telemetry/sensor/mod.rs index 7d8ddc40b..ad6384ec4 100644 --- a/src/telemetry/sensor/mod.rs +++ b/src/telemetry/sensor/mod.rs @@ -9,7 +9,7 @@ use tower_h2::Body; use ctx; use telemetry::{event, metrics, tap}; -use transport::{Connection, tls}; +use transport::Connection; use transparency::ClientError; pub mod http; @@ -32,10 +32,6 @@ struct Handle(Option); #[derive(Clone, Debug)] pub struct Sensors(Option); -/// Given to the TLS config watch to generate events on reloads. -#[derive(Clone, Debug)] -pub struct TlsConfig(Handle); - impl Handle { fn send(&mut self, mk: F) where @@ -104,20 +100,4 @@ impl Sensors { { NewHttp::new(new_service, Handle(self.0.clone()), client_ctx) } - - pub fn tls_config(&self) -> TlsConfig { - TlsConfig(Handle(self.0.clone())) - } -} - -impl TlsConfig { - - pub fn reloaded(&mut self) { - use std::time::SystemTime; - self.0.send(|| event::Event::TlsConfigReloaded(SystemTime::now())) - } - - pub fn failed(&mut self, err: tls::ConfigError) { - self.0.send(|| event::Event::TlsConfigReloadFailed(err.into())) - } } diff --git a/src/transport/tls/config.rs b/src/transport/tls/config.rs index f3c1ff916..978dd5318 100644 --- a/src/transport/tls/config.rs +++ b/src/transport/tls/config.rs @@ -16,7 +16,7 @@ use super::{ webpki, }; use conditional::Conditional; -use telemetry::sensor; +use telemetry::metrics::tls_config_reload; use futures::{future, stream, Future, Stream}; use futures_watch::{Store, Watch}; @@ -182,7 +182,7 @@ impl CommonSettings { fn stream_changes( self, interval: Duration, - mut sensor: sensor::TlsConfig, + mut sensor: tls_config_reload::Sensor, ) -> impl Stream { let paths = self.paths().iter() @@ -337,7 +337,7 @@ impl ConfigWatch { /// /// The returned task Future is expected to never complete. If TLS is /// disabled then an empty future is returned. - pub fn start(self, sensor: sensor::TlsConfig) -> PublishConfigs { + pub fn start(self, sensor: tls_config_reload::Sensor) -> PublishConfigs { let settings = match self.settings { Conditional::Some(settings) => settings, Conditional::None(_) => {