Share a common tls_config_reload metrics object (#46)
The current implementation of TLS config reload telemetry has several layers: a sensor emits events into a dispatcher that updates metrics. This can be simplified by sharing a common metrics object between the metrics registry and config module. The metrics registry is updated to hold a read-only `tls_config_reload::Fmt`; and the config module holds a `tls_config_reload::Sensor`. The `Sensor` type holds a strong reference to an inner metrics structure and acquires a lock on updates. The `Fmt` type holds a weak reference to the metrics structure so that the metrics server can print the state as long as it's actually held in memory. If the `Sensor` is dropped (for instance, because TLS has been administratively disabled), then no metrics will be formatted by `Fmt`.
This commit is contained in:
parent
9912f4577b
commit
abc75a506d
|
@ -102,7 +102,7 @@ fn record_response_end(b: &mut Bencher) {
|
|||
frames_sent: 0,
|
||||
};
|
||||
|
||||
let (mut r, _) = metrics::new(&process, Duration::from_secs(1000));
|
||||
let (mut r, _) = metrics::new(&process, Duration::from_secs(1000), Default::default());
|
||||
b.iter(|| r.record_event(&Event::StreamResponseEnd(rsp.clone(), end.clone())));
|
||||
}
|
||||
|
||||
|
@ -169,7 +169,7 @@ fn record_one_conn_request(b: &mut Bencher) {
|
|||
}),
|
||||
];
|
||||
|
||||
let (mut r, _) = metrics::new(&process, Duration::from_secs(1000));
|
||||
let (mut r, _) = metrics::new(&process, Duration::from_secs(1000), Default::default());
|
||||
b.iter(|| for e in &events { r.record_event(e); });
|
||||
}
|
||||
|
||||
|
@ -239,6 +239,6 @@ fn record_many_dsts(b: &mut Bencher) {
|
|||
tx_bytes: 4321,
|
||||
}));
|
||||
|
||||
let (mut r, _) = metrics::new(&process, Duration::from_secs(1000));
|
||||
let (mut r, _) = metrics::new(&process, Duration::from_secs(1000), Default::default());
|
||||
b.iter(|| for e in &events { r.record_event(e); });
|
||||
}
|
||||
|
|
|
@ -247,14 +247,14 @@ where
|
|||
);
|
||||
|
||||
let (taps, observe) = control::Observe::new(100);
|
||||
let (sensors, metrics_server) = telemetry::new(
|
||||
let (sensors, tls_config_sensor, metrics_server) = telemetry::new(
|
||||
&process_ctx,
|
||||
config.metrics_retain_idle,
|
||||
&taps,
|
||||
);
|
||||
|
||||
let tls_client_config = tls_config_watch.client.clone();
|
||||
let tls_cfg_bg = tls_config_watch.start(sensors.tls_config());
|
||||
let tls_cfg_bg = tls_config_watch.start(tls_config_sensor);
|
||||
|
||||
let controller_tls = config.tls_settings.as_ref().and_then(|settings| {
|
||||
settings.controller_identity.as_ref().map(|controller_identity| {
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use h2;
|
||||
|
||||
|
@ -17,9 +17,6 @@ pub enum Event {
|
|||
StreamResponseOpen(Arc<ctx::http::Response>, StreamResponseOpen),
|
||||
StreamResponseFail(Arc<ctx::http::Response>, StreamResponseFail),
|
||||
StreamResponseEnd(Arc<ctx::http::Response>, StreamResponseEnd),
|
||||
|
||||
TlsConfigReloaded(SystemTime),
|
||||
TlsConfigReloadFailed(::transport::tls::ConfigError),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
|
|
|
@ -59,7 +59,7 @@ mod latency;
|
|||
mod process;
|
||||
mod record;
|
||||
mod serve;
|
||||
mod tls_config_reload;
|
||||
pub mod tls_config_reload;
|
||||
mod transport;
|
||||
|
||||
use self::counter::Counter;
|
||||
|
@ -74,7 +74,6 @@ use self::labels::{
|
|||
pub use self::labels::DstLabels;
|
||||
pub use self::record::Record;
|
||||
pub use self::serve::Serve;
|
||||
pub use self::tls_config_reload::TlsConfigReload;
|
||||
|
||||
/// Writes a metric in prometheus-formatted output.
|
||||
///
|
||||
|
@ -111,7 +110,7 @@ struct Root {
|
|||
responses: http::ResponseScopes,
|
||||
transports: transport::OpenScopes,
|
||||
transport_closes: transport::CloseScopes,
|
||||
tls_config_reload: TlsConfigReload,
|
||||
tls_config_reload: tls_config_reload::Fmt,
|
||||
process: process::Process,
|
||||
}
|
||||
|
||||
|
@ -136,8 +135,10 @@ struct Stamped<T> {
|
|||
/// is a Hyper service which can be used to create the server for the
|
||||
/// scrape endpoint, while the `Record` side can receive updates to the
|
||||
/// metrics by calling `record_event`.
|
||||
pub fn new(process: &Arc<ctx::Process>, idle_retain: Duration) -> (Record, Serve){
|
||||
let metrics = Arc::new(Mutex::new(Root::new(process)));
|
||||
pub fn new(process: &Arc<ctx::Process>, idle_retain: Duration, tls: tls_config_reload::Fmt)
|
||||
-> (Record, Serve)
|
||||
{
|
||||
let metrics = Arc::new(Mutex::new(Root::new(process, tls)));
|
||||
(Record::new(&metrics), Serve::new(&metrics, idle_retain))
|
||||
}
|
||||
|
||||
|
@ -174,9 +175,10 @@ impl<'a, M: FmtMetric> Metric<'a, M> {
|
|||
// ===== impl Root =====
|
||||
|
||||
impl Root {
|
||||
pub fn new(process: &Arc<ctx::Process>) -> Self {
|
||||
pub fn new(process: &Arc<ctx::Process>, tls_config_reload: tls_config_reload::Fmt) -> Self {
|
||||
Self {
|
||||
process: process::Process::new(&process),
|
||||
tls_config_reload,
|
||||
.. Root::default()
|
||||
}
|
||||
}
|
||||
|
@ -205,10 +207,6 @@ impl Root {
|
|||
.stamped()
|
||||
}
|
||||
|
||||
fn tls_config(&mut self) -> &mut TlsConfigReload {
|
||||
&mut self.tls_config_reload
|
||||
}
|
||||
|
||||
fn retain_since(&mut self, epoch: Instant) {
|
||||
self.requests.retain_since(epoch);
|
||||
self.responses.retain_since(epoch);
|
||||
|
|
|
@ -82,12 +82,6 @@ impl Record {
|
|||
.close(close.duration);
|
||||
})
|
||||
},
|
||||
|
||||
Event::TlsConfigReloaded(ref when) =>
|
||||
self.update(|m| m.tls_config().success(when)),
|
||||
|
||||
Event::TlsConfigReloadFailed(ref err) =>
|
||||
self.update(|m| m.tls_config().error(err.clone())),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -135,7 +129,7 @@ mod test {
|
|||
frames_sent: 0,
|
||||
};
|
||||
|
||||
let (mut r, _) = metrics::new(&process, Duration::from_secs(100));
|
||||
let (mut r, _) = metrics::new(&process, Duration::from_secs(100), Default::default());
|
||||
let ev = Event::StreamResponseEnd(rsp.clone(), end.clone());
|
||||
let labels = labels::ResponseLabels::new(&rsp, None);
|
||||
|
||||
|
@ -230,7 +224,7 @@ mod test {
|
|||
),
|
||||
];
|
||||
|
||||
let (mut r, _) = metrics::new(&process, Duration::from_secs(1000));
|
||||
let (mut r, _) = metrics::new(&process, Duration::from_secs(1000), Default::default());
|
||||
|
||||
let req_labels = RequestLabels::new(&req);
|
||||
let rsp_labels = ResponseLabels::new(&rsp, None);
|
||||
|
|
|
@ -1,6 +1,11 @@
|
|||
use std::{fmt, path::PathBuf, time::{SystemTime, UNIX_EPOCH}};
|
||||
use std::{
|
||||
fmt,
|
||||
path::PathBuf,
|
||||
sync::{Arc, Mutex, Weak},
|
||||
time::{SystemTime, UNIX_EPOCH},
|
||||
};
|
||||
|
||||
use telemetry::metrics::{Counter, Gauge, Metric, Scopes, labels::Errno};
|
||||
use telemetry::metrics::{labels::Errno, Counter, Gauge, Metric, Scopes};
|
||||
use transport::tls;
|
||||
|
||||
metrics! {
|
||||
|
@ -13,8 +18,25 @@ metrics! {
|
|||
}
|
||||
}
|
||||
|
||||
/// Constructs a Sensor/Fmt pair for TLS config reload metrics.
|
||||
pub fn new() -> (Sensor, Fmt) {
|
||||
let inner = Arc::new(Mutex::new(Inner::default()));
|
||||
let fmt = Fmt(Arc::downgrade(&inner));
|
||||
(Sensor(inner), fmt)
|
||||
}
|
||||
|
||||
/// Supports recording TLS config reload metrics.
|
||||
///
|
||||
/// When this type is dropped, its metrics may no longer be formatted for prometheus.
|
||||
#[derive(Debug)]
|
||||
pub struct Sensor(Arc<Mutex<Inner>>);
|
||||
|
||||
/// Formats metrics for Prometheus for a corresonding `Sensor`.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct TlsConfigReload {
|
||||
pub struct Fmt(Weak<Mutex<Inner>>);
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct Inner {
|
||||
last_reload: Option<Gauge>,
|
||||
by_status: Scopes<Status, Counter>,
|
||||
}
|
||||
|
@ -25,41 +47,61 @@ enum Status {
|
|||
InvalidTrustAnchors,
|
||||
InvalidPrivateKey,
|
||||
InvalidEndEntityCert,
|
||||
Io { path: PathBuf, errno: Option<Errno>, },
|
||||
Io { path: PathBuf, errno: Option<Errno> },
|
||||
}
|
||||
|
||||
// ===== impl TlsConfigReload =====
|
||||
// ===== impl Sensor =====
|
||||
|
||||
impl TlsConfigReload {
|
||||
pub fn success(&mut self, when: &SystemTime) {
|
||||
let t = when
|
||||
impl Sensor {
|
||||
pub fn reloaded(&mut self) {
|
||||
let t = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.expect("times must be after UNIX epoch")
|
||||
.as_secs();
|
||||
self.last_reload = Some(t.into());
|
||||
|
||||
self.by_status.scopes
|
||||
.entry(Status::Reloaded)
|
||||
.or_insert_with(|| Counter::default())
|
||||
.incr()
|
||||
if let Ok(mut inner) = self.0.lock() {
|
||||
inner.last_reload = Some(t.into());
|
||||
|
||||
inner
|
||||
.by_status
|
||||
.scopes
|
||||
.entry(Status::Reloaded)
|
||||
.or_insert_with(|| Counter::default())
|
||||
.incr();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn error(&mut self, e: tls::ConfigError) {
|
||||
self.by_status.scopes
|
||||
.entry(e.into())
|
||||
.or_insert_with(|| Counter::default())
|
||||
.incr()
|
||||
pub fn failed(&mut self, e: tls::ConfigError) {
|
||||
if let Ok(mut inner) = self.0.lock() {
|
||||
inner
|
||||
.by_status
|
||||
.scopes
|
||||
.entry(e.into())
|
||||
.or_insert_with(|| Counter::default())
|
||||
.incr();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for TlsConfigReload {
|
||||
// ===== impl Fmt =====
|
||||
|
||||
impl fmt::Display for Fmt {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
if !self.by_status.scopes.is_empty() {
|
||||
let lock = match self.0.upgrade() {
|
||||
None => return Ok(()),
|
||||
Some(lock) => lock,
|
||||
};
|
||||
let inner = match lock.lock() {
|
||||
Err(_) => return Ok(()),
|
||||
Ok(inner) => inner,
|
||||
};
|
||||
|
||||
if !inner.by_status.scopes.is_empty() {
|
||||
tls_config_reload_total.fmt_help(f)?;
|
||||
tls_config_reload_total.fmt_scopes(f, &self.by_status, |s| &s)?;
|
||||
tls_config_reload_total.fmt_scopes(f, &inner.by_status, |s| &s)?;
|
||||
}
|
||||
|
||||
if let Some(timestamp) = self.last_reload {
|
||||
if let Some(timestamp) = inner.last_reload {
|
||||
tls_config_last_reload_seconds.fmt_help(f)?;
|
||||
tls_config_last_reload_seconds.fmt_metric(f, timestamp)?;
|
||||
}
|
||||
|
@ -73,14 +115,13 @@ impl fmt::Display for TlsConfigReload {
|
|||
impl From<tls::ConfigError> for Status {
|
||||
fn from(err: tls::ConfigError) -> Self {
|
||||
match err {
|
||||
tls::ConfigError::Io(path, error_code) =>
|
||||
Status::Io { path, errno: error_code.map(Errno::from) },
|
||||
tls::ConfigError::FailedToParseTrustAnchors(_) =>
|
||||
Status::InvalidTrustAnchors,
|
||||
tls::ConfigError::EndEntityCertIsNotValid(_) =>
|
||||
Status::InvalidEndEntityCert,
|
||||
tls::ConfigError::InvalidPrivateKey =>
|
||||
Status::InvalidPrivateKey,
|
||||
tls::ConfigError::Io(path, error_code) => Status::Io {
|
||||
path,
|
||||
errno: error_code.map(Errno::from),
|
||||
},
|
||||
tls::ConfigError::FailedToParseTrustAnchors(_) => Status::InvalidTrustAnchors,
|
||||
tls::ConfigError::EndEntityCertIsNotValid(_) => Status::InvalidEndEntityCert,
|
||||
tls::ConfigError::InvalidPrivateKey => Status::InvalidPrivateKey,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -88,25 +129,27 @@ impl From<tls::ConfigError> for Status {
|
|||
impl fmt::Display for Status {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match self {
|
||||
Status::Reloaded =>
|
||||
f.pad("status=\"reloaded\""),
|
||||
Status::Io { ref path, errno: Some(errno) } =>
|
||||
write!(f,
|
||||
"status=\"io_error\",path=\"{}\",errno=\"{}\"",
|
||||
path.display(), errno
|
||||
),
|
||||
Status::Io { ref path, errno: None } =>
|
||||
write!(f,
|
||||
"status=\"io_error\",path=\"{}\",errno=\"UNKNOWN\"",
|
||||
path.display(),
|
||||
),
|
||||
Status::InvalidPrivateKey =>
|
||||
f.pad("status=\"invalid_private_key\""),
|
||||
Status::InvalidEndEntityCert =>
|
||||
f.pad("status=\"invalid_end_entity_cert\""),
|
||||
Status::InvalidTrustAnchors =>
|
||||
f.pad("status=\"invalid_trust_anchors\""),
|
||||
Status::Reloaded => f.pad("status=\"reloaded\""),
|
||||
Status::Io {
|
||||
ref path,
|
||||
errno: Some(errno),
|
||||
} => write!(
|
||||
f,
|
||||
"status=\"io_error\",path=\"{}\",errno=\"{}\"",
|
||||
path.display(),
|
||||
errno
|
||||
),
|
||||
Status::Io {
|
||||
ref path,
|
||||
errno: None,
|
||||
} => write!(
|
||||
f,
|
||||
"status=\"io_error\",path=\"{}\",errno=\"UNKNOWN\"",
|
||||
path.display(),
|
||||
),
|
||||
Status::InvalidPrivateKey => f.pad("status=\"invalid_private_key\""),
|
||||
Status::InvalidEndEntityCert => f.pad("status=\"invalid_end_entity_cert\""),
|
||||
Status::InvalidTrustAnchors => f.pad("status=\"invalid_trust_anchors\""),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -15,8 +15,9 @@ pub fn new(
|
|||
process: &Arc<ctx::Process>,
|
||||
metrics_retain_idle: Duration,
|
||||
taps: &Arc<Mutex<tap::Taps>>,
|
||||
) -> (Sensors, metrics::Serve) {
|
||||
let (metrics_record, metrics_serve) = metrics::new(process, metrics_retain_idle);
|
||||
) -> (Sensors, metrics::tls_config_reload::Sensor, metrics::Serve) {
|
||||
let (tls_config_sensor, tls_config_fmt) = metrics::tls_config_reload::new();
|
||||
let (metrics_record, metrics_serve) = metrics::new(process, metrics_retain_idle, tls_config_fmt);
|
||||
let s = Sensors::new(metrics_record, taps);
|
||||
(s, metrics_serve)
|
||||
(s, tls_config_sensor, metrics_serve)
|
||||
}
|
||||
|
|
|
@ -9,7 +9,7 @@ use tower_h2::Body;
|
|||
|
||||
use ctx;
|
||||
use telemetry::{event, metrics, tap};
|
||||
use transport::{Connection, tls};
|
||||
use transport::Connection;
|
||||
use transparency::ClientError;
|
||||
|
||||
pub mod http;
|
||||
|
@ -32,10 +32,6 @@ struct Handle(Option<Inner>);
|
|||
#[derive(Clone, Debug)]
|
||||
pub struct Sensors(Option<Inner>);
|
||||
|
||||
/// Given to the TLS config watch to generate events on reloads.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct TlsConfig(Handle);
|
||||
|
||||
impl Handle {
|
||||
fn send<F>(&mut self, mk: F)
|
||||
where
|
||||
|
@ -104,20 +100,4 @@ impl Sensors {
|
|||
{
|
||||
NewHttp::new(new_service, Handle(self.0.clone()), client_ctx)
|
||||
}
|
||||
|
||||
pub fn tls_config(&self) -> TlsConfig {
|
||||
TlsConfig(Handle(self.0.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
impl TlsConfig {
|
||||
|
||||
pub fn reloaded(&mut self) {
|
||||
use std::time::SystemTime;
|
||||
self.0.send(|| event::Event::TlsConfigReloaded(SystemTime::now()))
|
||||
}
|
||||
|
||||
pub fn failed(&mut self, err: tls::ConfigError) {
|
||||
self.0.send(|| event::Event::TlsConfigReloadFailed(err.into()))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@ use super::{
|
|||
webpki,
|
||||
};
|
||||
use conditional::Conditional;
|
||||
use telemetry::sensor;
|
||||
use telemetry::metrics::tls_config_reload;
|
||||
|
||||
use futures::{future, stream, Future, Stream};
|
||||
use futures_watch::{Store, Watch};
|
||||
|
@ -182,7 +182,7 @@ impl CommonSettings {
|
|||
fn stream_changes(
|
||||
self,
|
||||
interval: Duration,
|
||||
mut sensor: sensor::TlsConfig,
|
||||
mut sensor: tls_config_reload::Sensor,
|
||||
) -> impl Stream<Item = CommonConfig, Error = ()>
|
||||
{
|
||||
let paths = self.paths().iter()
|
||||
|
@ -337,7 +337,7 @@ impl ConfigWatch {
|
|||
///
|
||||
/// The returned task Future is expected to never complete. If TLS is
|
||||
/// disabled then an empty future is returned.
|
||||
pub fn start(self, sensor: sensor::TlsConfig) -> PublishConfigs {
|
||||
pub fn start(self, sensor: tls_config_reload::Sensor) -> PublishConfigs {
|
||||
let settings = match self.settings {
|
||||
Conditional::Some(settings) => settings,
|
||||
Conditional::None(_) => {
|
||||
|
|
Loading…
Reference in New Issue