Store transport metrics hierarchically (#66)

Previously, transport metrics have been stored in two "scopes": an
"open" scope, storing metrics by transport class, and a "close" scope
storing metrics by transport and end-of-stream classes.

Instead of storing these as parallel maps, this changes transport
metrics to be stored hierarchically so that the end-of-stream metrics
are stored _within_ the transport metrics type.

This will make it possible to share the transport metrics structure
directly with the transport wrapper so that a global lock need only be
taken at connect-time. Subsequent updates will then be scoped to the
shared transport structure.

This helps to setup linkerd/linkerd2#1430, among other improvements.

Furthermore, the `Scopes` type is no longer used for transport metrics,
since it's much easier to use the full affordances of `indexmap`,
especially now that it isn't part of an API.
This commit is contained in:
Oliver Gould 2018-08-14 16:29:07 -07:00 committed by GitHub
parent b9051f0f45
commit 48b7383ff0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 71 additions and 103 deletions

View File

@ -1,3 +1,4 @@
use indexmap::IndexMap;
use std::fmt; use std::fmt;
use std::time::Duration; use std::time::Duration;
@ -8,7 +9,6 @@ use super::{
Counter, Counter,
Gauge, Gauge,
Histogram, Histogram,
Scopes,
}; };
use telemetry::Errno; use telemetry::Errno;
@ -27,32 +27,28 @@ metrics! {
/// Implements `fmt::Display` to render prometheus-formatted metrics for all transports. /// Implements `fmt::Display` to render prometheus-formatted metrics for all transports.
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct Transports { pub struct Transports {
opens: OpenScopes, metrics: IndexMap<Key, Metrics>,
closes: CloseScopes,
} }
type OpenScopes = Scopes<TransportLabels, OpenMetrics>; /// Describes the dimensions across which transport metrics are aggregated.
///
/// Implements `fmt::Display` to render a comma-separated list of key-value pairs.
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
struct Key {
direction: Direction,
peer: Peer,
tls_status: TlsStatus,
}
/// Holds all of the metrics for a class of transport.
#[derive(Debug, Default)] #[derive(Debug, Default)]
struct OpenMetrics { struct Metrics {
open_total: Counter, open_total: Counter,
open_connections: Gauge, open_connections: Gauge,
write_bytes_total: Counter, write_bytes_total: Counter,
read_bytes_total: Counter, read_bytes_total: Counter,
}
type CloseScopes = Scopes<TransportCloseLabels, CloseMetrics>; by_eos: IndexMap<Eos, EosMetrics>,
/// Labels describing a TCP connection
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
struct TransportLabels {
/// Was the transport opened in the inbound or outbound direction?
direction: Direction,
peer: Peer,
/// Was the transport secured with TLS?
tls_status: TlsStatus,
} }
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
@ -66,25 +62,20 @@ pub enum Eos {
}, },
} }
/// Labels describing the end of a TCP connection /// Holds metrics for a class of end-of-stream.
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
struct TransportCloseLabels {
transport: TransportLabels,
eos: Eos,
}
#[derive(Debug, Default)] #[derive(Debug, Default)]
struct CloseMetrics { struct EosMetrics {
close_total: Counter, close_total: Counter,
connection_duration: Histogram<latency::Ms>, connection_duration: Histogram<latency::Ms>,
} }
struct FmtEos<'a>(&'a Key, &'a Eos);
// ===== impl Transports ===== // ===== impl Transports =====
impl Transports { impl Transports {
pub fn open(&mut self, ctx: &ctx::transport::Ctx) { pub fn open(&mut self, ctx: &ctx::transport::Ctx) {
let k = TransportLabels::new(ctx); let metrics = self.metrics.entry(Key::new(ctx)).or_insert_with(|| Default::default());
let metrics = self.opens.get_or_default(k);
metrics.open_total.incr(); metrics.open_total.incr();
metrics.open_connections.incr(); metrics.open_connections.incr();
} }
@ -97,23 +88,31 @@ impl Transports {
rx: u64, rx: u64,
tx: u64, tx: u64,
) { ) {
let key = TransportLabels::new(ctx); let key = Key::new(ctx);
let o = self.opens.get_or_default(key); let metrics = self.metrics.entry(key).or_insert_with(|| Default::default());
o.open_connections.decr(); metrics.open_connections.decr();
o.read_bytes_total += rx; metrics.read_bytes_total += rx;
o.write_bytes_total += tx; metrics.write_bytes_total += tx;
let k = TransportCloseLabels::new(key, eos); let class = metrics.by_eos.entry(eos).or_insert_with(|| Default::default());
let c = self.closes.get_or_default(k); class.close_total.incr();
c.close_total.incr(); class.connection_duration.add(duration);
c.connection_duration.add(duration); }
/// Iterates over all end-of-stream metrics.
fn iter_eos(&self) -> impl Iterator<Item = (FmtEos, &EosMetrics)> {
self.metrics
.iter()
.flat_map(|(k, t)| {
t.by_eos.iter().map(move |(e, m)| (FmtEos(k ,e), m))
})
} }
#[cfg(test)] #[cfg(test)]
pub fn open_total(&self, ctx: &ctx::transport::Ctx) -> u64 { pub fn open_total(&self, ctx: &ctx::transport::Ctx) -> u64 {
self.opens self.metrics
.get(&TransportLabels::new(ctx)) .get(&Key::new(ctx))
.map(|m| m.open_total.into()) .map(|m| m.open_total.into())
.unwrap_or(0) .unwrap_or(0)
} }
@ -121,92 +120,75 @@ impl Transports {
// #[cfg(test)] // #[cfg(test)]
// pub fn open_connections(&self, ctx: &ctx::transport::Ctx) -> u64 { // pub fn open_connections(&self, ctx: &ctx::transport::Ctx) -> u64 {
// self.metrics // self.metrics
// .get(&Key::from(ctx)) // .get(&Key::new(ctx))
// .map(|m| m.open_connections.into()) // .map(|m| m.open_connections.into())
// .unwrap_or(0) // .unwrap_or(0)
// } // }
#[cfg(test)] #[cfg(test)]
pub fn rx_tx_bytes_total(&self, ctx: &ctx::transport::Ctx) -> (u64, u64) { pub fn rx_tx_bytes_total(&self, ctx: &ctx::transport::Ctx) -> (u64, u64) {
self.opens self.metrics
.get(&TransportLabels::new(ctx)) .get(&Key::new(ctx))
.map(|m| (m.read_bytes_total.into(), m.write_bytes_total.into())) .map(|m| (m.read_bytes_total.into(), m.write_bytes_total.into()))
.unwrap_or((0, 0)) .unwrap_or((0, 0))
} }
#[cfg(test)] #[cfg(test)]
pub fn close_total(&self, ctx: &ctx::transport::Ctx, eos: Eos) -> u64 { pub fn close_total(&self, ctx: &ctx::transport::Ctx, eos: Eos) -> u64 {
self.closes self.metrics
.get(&TransportCloseLabels::new(TransportLabels::new(ctx), eos)) .get(&Key::new(ctx))
.map(|m| m.close_total.into()) .and_then(move |m| m.by_eos.get(&eos).map(|m| m.close_total.into()))
.unwrap_or(0) .unwrap_or(0)
} }
#[cfg(test)] #[cfg(test)]
pub fn connection_durations(&self, ctx: &ctx::transport::Ctx, eos: Eos) -> Histogram<latency::Ms> { pub fn connection_durations(&self, ctx: &ctx::transport::Ctx, eos: Eos) -> Histogram<latency::Ms> {
self.closes self.metrics
.get(&TransportCloseLabels::new(TransportLabels::new(ctx), eos)) .get(&Key::new(ctx))
.map(|m| m.connection_duration.clone()) .and_then(move |m| m.by_eos.get(&eos).map(|m| m.connection_duration.clone()))
.unwrap_or_default() .unwrap_or_default()
} }
} }
impl fmt::Display for Transports { impl fmt::Display for Transports {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.opens.fmt(f)?; if self.metrics.is_empty() {
self.closes.fmt(f)?;
Ok(())
}
}
// ===== impl OpenScopes =====
impl fmt::Display for OpenScopes {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if self.is_empty() {
return Ok(()); return Ok(());
} }
tcp_open_total.fmt_help(f)?; tcp_open_total.fmt_help(f)?;
tcp_open_total.fmt_scopes(f, self, |s| &s.open_total)?; tcp_open_total.fmt_scopes(f, &self.metrics, |m| &m.open_total)?;
tcp_open_connections.fmt_help(f)?; tcp_open_connections.fmt_help(f)?;
tcp_open_connections.fmt_scopes(f, self, |s| &s.open_connections)?; tcp_open_connections.fmt_scopes(f, &self.metrics, |m| &m.open_connections)?;
tcp_read_bytes_total.fmt_help(f)?; tcp_read_bytes_total.fmt_help(f)?;
tcp_read_bytes_total.fmt_scopes(f, self, |s| &s.read_bytes_total)?; tcp_read_bytes_total.fmt_scopes(f, &self.metrics, |m| &m.read_bytes_total)?;
tcp_write_bytes_total.fmt_help(f)?; tcp_write_bytes_total.fmt_help(f)?;
tcp_write_bytes_total.fmt_scopes(f, self, |s| &s.write_bytes_total)?; tcp_write_bytes_total.fmt_scopes(f, &self.metrics, |m| &m.write_bytes_total)?;
Ok(())
}
}
// ===== impl CloseScopes =====
impl fmt::Display for CloseScopes {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if self.is_empty() {
return Ok(());
}
tcp_close_total.fmt_help(f)?; tcp_close_total.fmt_help(f)?;
tcp_close_total.fmt_scopes(f, self, |s| &s.close_total)?; tcp_close_total.fmt_scopes(f, self.iter_eos(), |e| &e.close_total)?;
tcp_connection_duration_ms.fmt_help(f)?; tcp_connection_duration_ms.fmt_help(f)?;
tcp_connection_duration_ms.fmt_scopes(f, self, |s| &s.connection_duration)?; tcp_connection_duration_ms.fmt_scopes(f, self.iter_eos(), |e| &e.connection_duration)?;
Ok(()) Ok(())
} }
} }
// ===== impl TransportLabels ===== // ===== impl Key =====
impl TransportLabels { impl fmt::Display for Key {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{},{},{}", self.direction, self.peer, self.tls_status)
}
}
impl Key {
fn new(ctx: &ctx::transport::Ctx) -> Self { fn new(ctx: &ctx::transport::Ctx) -> Self {
TransportLabels { Self {
direction: Direction::from_context(ctx.proxy().as_ref()), direction: Direction::from_context(ctx.proxy().as_ref()),
peer: match *ctx { peer: match *ctx {
ctx::transport::Ctx::Server(_) => Peer::Src, ctx::transport::Ctx::Server(_) => Peer::Src,
@ -217,12 +199,6 @@ impl TransportLabels {
} }
} }
impl fmt::Display for TransportLabels {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{},{},{}", self.direction, self.peer, self.tls_status)
}
}
impl fmt::Display for Peer { impl fmt::Display for Peer {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self { match *self {
@ -232,22 +208,6 @@ impl fmt::Display for Peer {
} }
} }
// ===== impl TransportCloseLabels =====
impl TransportCloseLabels {
fn new(transport: TransportLabels, eos: Eos) -> Self {
Self {
transport,
eos,
}
}
}
impl fmt::Display for TransportCloseLabels {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{},{}", self.transport, self.eos)
}
}
// ===== impl Eos ===== // ===== impl Eos =====
@ -265,3 +225,11 @@ impl fmt::Display for Eos {
} }
} }
} }
// ===== impl FmtEos =====
impl<'a> fmt::Display for FmtEos<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{},{}", self.0, self.1)
}
}