Narrow transport metric's API (#65)
The `telemetry::metrics::transport` module exposes much of its implementation details to callers, which makes it difficult to make changes to how the module is structured. In preparation for further refactors, this change narrows the module's public API: All labels and scopes types have been made private. A single, public `Transports` type has been introduce to describe the entire public interface of the module. This has been crafted to be free of `event` types and to have minimal external dependencies. A new `transport::Eos` type has been introduced to replace the overloaded `labels::Classification` type -- this type was initially introduced to model _HTTP response_ classification, but it was reused for transports. This is an undesirable coupling that will have to be broken when we start to adddress HTTP response classification properly.
This commit is contained in:
parent
d0e1e71bff
commit
b9051f0f45
|
@ -9,7 +9,6 @@ use http;
|
||||||
|
|
||||||
use ctx;
|
use ctx;
|
||||||
use conditional::Conditional;
|
use conditional::Conditional;
|
||||||
use telemetry::event;
|
|
||||||
use transport::tls;
|
use transport::tls;
|
||||||
|
|
||||||
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
|
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
|
||||||
|
@ -192,15 +191,6 @@ impl Classification {
|
||||||
grpc_status.map(Classification::grpc_status)
|
grpc_status.map(Classification::grpc_status)
|
||||||
.unwrap_or_else(|| Classification::http_status(&rsp.status))
|
.unwrap_or_else(|| Classification::http_status(&rsp.status))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn transport_close(close: &event::TransportClose) -> Self {
|
|
||||||
if close.clean {
|
|
||||||
Classification::Success
|
|
||||||
} else {
|
|
||||||
Classification::Failure
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Display for Classification {
|
impl fmt::Display for Classification {
|
||||||
|
|
|
@ -51,11 +51,11 @@ use self::labels::{
|
||||||
RequestLabels,
|
RequestLabels,
|
||||||
ResponseLabels,
|
ResponseLabels,
|
||||||
};
|
};
|
||||||
use self::transport::{TransportLabels, TransportCloseLabels};
|
|
||||||
pub use self::labels::DstLabels;
|
pub use self::labels::DstLabels;
|
||||||
pub use self::record::Record;
|
pub use self::record::Record;
|
||||||
pub use self::scopes::Scopes;
|
pub use self::scopes::Scopes;
|
||||||
pub use self::serve::Serve;
|
pub use self::serve::Serve;
|
||||||
|
pub use self::transport::Transports;
|
||||||
use super::process;
|
use super::process;
|
||||||
use super::tls_config_reload;
|
use super::tls_config_reload;
|
||||||
|
|
||||||
|
@ -92,8 +92,7 @@ pub struct Metric<'a, M: FmtMetric> {
|
||||||
struct Root {
|
struct Root {
|
||||||
requests: http::RequestScopes,
|
requests: http::RequestScopes,
|
||||||
responses: http::ResponseScopes,
|
responses: http::ResponseScopes,
|
||||||
transports: transport::OpenScopes,
|
transports: Transports,
|
||||||
transport_closes: transport::CloseScopes,
|
|
||||||
tls_config_reload: tls_config_reload::Report,
|
tls_config_reload: tls_config_reload::Report,
|
||||||
process: process::Report,
|
process: process::Report,
|
||||||
}
|
}
|
||||||
|
@ -171,12 +170,8 @@ impl Root {
|
||||||
self.responses.get_or_default(labels).stamped()
|
self.responses.get_or_default(labels).stamped()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn transport(&mut self, labels: TransportLabels) -> &mut transport::OpenMetrics {
|
fn transports(&mut self) -> &mut Transports {
|
||||||
self.transports.get_or_default(labels)
|
&mut self.transports
|
||||||
}
|
|
||||||
|
|
||||||
fn transport_close(&mut self, labels: TransportCloseLabels) -> &mut transport::CloseMetrics {
|
|
||||||
self.transport_closes.get_or_default(labels)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn retain_since(&mut self, epoch: Instant) {
|
fn retain_since(&mut self, epoch: Instant) {
|
||||||
|
@ -190,7 +185,6 @@ impl fmt::Display for Root {
|
||||||
self.requests.fmt(f)?;
|
self.requests.fmt(f)?;
|
||||||
self.responses.fmt(f)?;
|
self.responses.fmt(f)?;
|
||||||
self.transports.fmt(f)?;
|
self.transports.fmt(f)?;
|
||||||
self.transport_closes.fmt(f)?;
|
|
||||||
self.tls_config_reload.fmt(f)?;
|
self.tls_config_reload.fmt(f)?;
|
||||||
self.process.fmt(f)?;
|
self.process.fmt(f)?;
|
||||||
|
|
||||||
|
@ -232,7 +226,6 @@ impl<T> ::std::ops::Deref for Stamped<T> {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use ctx::test_util::*;
|
use ctx::test_util::*;
|
||||||
use telemetry::event;
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use conditional::Conditional;
|
use conditional::Conditional;
|
||||||
use tls;
|
use tls;
|
||||||
|
@ -250,22 +243,14 @@ mod tests {
|
||||||
let (req, rsp) = request("http://nba.com", &server, &client);
|
let (req, rsp) = request("http://nba.com", &server, &client);
|
||||||
|
|
||||||
let client_transport = Arc::new(ctx::transport::Ctx::Client(client));
|
let client_transport = Arc::new(ctx::transport::Ctx::Client(client));
|
||||||
let transport = TransportLabels::new(&client_transport);
|
root.transports.open(&client_transport);
|
||||||
root.transport(transport.clone()).open();
|
|
||||||
|
|
||||||
root.request(RequestLabels::new(&req)).end();
|
root.request(RequestLabels::new(&req)).end();
|
||||||
root.response(ResponseLabels::new(&rsp, None)).end(Duration::from_millis(10));
|
root.response(ResponseLabels::new(&rsp, None)).end(Duration::from_millis(10));
|
||||||
root.transport(transport).close(100, 200);
|
|
||||||
|
|
||||||
let end = TransportCloseLabels::new(&client_transport, &event::TransportClose {
|
let duration = Duration::from_millis(15);
|
||||||
clean: true,
|
root.transports().close(&client_transport, transport::Eos::Clean, duration, 100, 200);
|
||||||
errno: None,
|
}
|
||||||
duration: Duration::from_millis(15),
|
|
||||||
rx_bytes: 40,
|
|
||||||
tx_bytes: 0,
|
|
||||||
});
|
|
||||||
root.transport_close(end).close(Duration::from_millis(15));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn expiry() {
|
fn expiry() {
|
||||||
|
@ -278,7 +263,7 @@ mod tests {
|
||||||
let mut root = Root::default();
|
let mut root = Root::default();
|
||||||
|
|
||||||
let t0 = Instant::now();
|
let t0 = Instant::now();
|
||||||
root.transport(TransportLabels::new(&server_transport)).open();
|
root.transports().open(&server_transport);
|
||||||
|
|
||||||
mock_route(&mut root, &proxy, &server, "warriors");
|
mock_route(&mut root, &proxy, &server, "warriors");
|
||||||
let t1 = Instant::now();
|
let t1 = Instant::now();
|
||||||
|
@ -288,25 +273,17 @@ mod tests {
|
||||||
|
|
||||||
assert_eq!(root.requests.len(), 2);
|
assert_eq!(root.requests.len(), 2);
|
||||||
assert_eq!(root.responses.len(), 2);
|
assert_eq!(root.responses.len(), 2);
|
||||||
assert_eq!(root.transports.len(), 2);
|
|
||||||
assert_eq!(root.transport_closes.len(), 1);
|
|
||||||
|
|
||||||
root.retain_since(t0);
|
root.retain_since(t0);
|
||||||
assert_eq!(root.requests.len(), 2);
|
assert_eq!(root.requests.len(), 2);
|
||||||
assert_eq!(root.responses.len(), 2);
|
assert_eq!(root.responses.len(), 2);
|
||||||
assert_eq!(root.transports.len(), 2);
|
|
||||||
assert_eq!(root.transport_closes.len(), 1);
|
|
||||||
|
|
||||||
root.retain_since(t1);
|
root.retain_since(t1);
|
||||||
assert_eq!(root.requests.len(), 1);
|
assert_eq!(root.requests.len(), 1);
|
||||||
assert_eq!(root.responses.len(), 1);
|
assert_eq!(root.responses.len(), 1);
|
||||||
assert_eq!(root.transports.len(), 2);
|
|
||||||
assert_eq!(root.transport_closes.len(), 1);
|
|
||||||
|
|
||||||
root.retain_since(t2);
|
root.retain_since(t2);
|
||||||
assert_eq!(root.requests.len(), 0);
|
assert_eq!(root.requests.len(), 0);
|
||||||
assert_eq!(root.responses.len(), 0);
|
assert_eq!(root.responses.len(), 0);
|
||||||
assert_eq!(root.transports.len(), 2);
|
|
||||||
assert_eq!(root.transport_closes.len(), 1);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,7 @@ use super::labels::{
|
||||||
RequestLabels,
|
RequestLabels,
|
||||||
ResponseLabels,
|
ResponseLabels,
|
||||||
};
|
};
|
||||||
use super::transport::{TransportLabels, TransportCloseLabels};
|
use super::transport;
|
||||||
|
|
||||||
/// Tracks Prometheus metrics
|
/// Tracks Prometheus metrics
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
|
@ -68,17 +68,21 @@ impl Record {
|
||||||
|
|
||||||
Event::TransportOpen(ref ctx) => {
|
Event::TransportOpen(ref ctx) => {
|
||||||
self.update(|metrics| {
|
self.update(|metrics| {
|
||||||
metrics.transport(TransportLabels::new(ctx)).open();
|
metrics.transports().open(ctx);
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
|
|
||||||
Event::TransportClose(ref ctx, ref close) => {
|
Event::TransportClose(ref ctx, ref close) => {
|
||||||
|
let eos = if close.clean {
|
||||||
|
transport::Eos::Clean
|
||||||
|
} else {
|
||||||
|
transport::Eos::Error {
|
||||||
|
errno: close.errno.map(|e| e.into())
|
||||||
|
}
|
||||||
|
};
|
||||||
self.update(|metrics| {
|
self.update(|metrics| {
|
||||||
metrics.transport(TransportLabels::new(ctx))
|
metrics.transports()
|
||||||
.close(close.rx_bytes, close.tx_bytes);
|
.close(ctx, eos, close.duration, close.rx_bytes, close.tx_bytes);
|
||||||
|
|
||||||
metrics.transport_close(TransportCloseLabels::new(ctx, close))
|
|
||||||
.close(close.duration);
|
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
@ -89,7 +93,7 @@ impl Record {
|
||||||
mod test {
|
mod test {
|
||||||
use telemetry::{
|
use telemetry::{
|
||||||
event,
|
event,
|
||||||
metrics::{self, labels, transport::{TransportLabels, TransportCloseLabels}},
|
metrics::{self, labels},
|
||||||
Event,
|
Event,
|
||||||
};
|
};
|
||||||
use ctx::{self, test_util::*, transport::TlsStatus};
|
use ctx::{self, test_util::*, transport::TlsStatus};
|
||||||
|
@ -228,33 +232,16 @@ mod test {
|
||||||
|
|
||||||
let req_labels = RequestLabels::new(&req);
|
let req_labels = RequestLabels::new(&req);
|
||||||
let rsp_labels = ResponseLabels::new(&rsp, None);
|
let rsp_labels = ResponseLabels::new(&rsp, None);
|
||||||
let srv_open_labels = TransportLabels::new(&server_transport);
|
|
||||||
let srv_close_labels = TransportCloseLabels::new(
|
|
||||||
&ctx::transport::Ctx::Server(server.clone()),
|
|
||||||
&transport_close,
|
|
||||||
);
|
|
||||||
let client_open_labels = TransportLabels::new(&client_transport);
|
|
||||||
let client_close_labels = TransportCloseLabels::new(
|
|
||||||
&ctx::transport::Ctx::Client(client.clone()),
|
|
||||||
&transport_close,
|
|
||||||
);
|
|
||||||
|
|
||||||
assert_eq!(client_tls, req_labels.tls_status().into());
|
assert_eq!(client_tls, req_labels.tls_status().into());
|
||||||
assert_eq!(client_tls, rsp_labels.tls_status().into());
|
assert_eq!(client_tls, rsp_labels.tls_status().into());
|
||||||
assert_eq!(client_tls, client_open_labels.tls_status().into());
|
|
||||||
assert_eq!(client_tls, client_close_labels.tls_status().into());
|
|
||||||
assert_eq!(server_tls, srv_open_labels.tls_status().into());
|
|
||||||
assert_eq!(server_tls, srv_close_labels.tls_status().into());
|
|
||||||
|
|
||||||
{
|
{
|
||||||
let lock = r.metrics.lock()
|
let mut lock = r.metrics.lock().expect("lock");
|
||||||
.expect("lock");
|
|
||||||
assert!(lock.requests.get(&req_labels).is_none());
|
assert!(lock.requests.get(&req_labels).is_none());
|
||||||
assert!(lock.responses.get(&rsp_labels).is_none());
|
assert!(lock.responses.get(&rsp_labels).is_none());
|
||||||
assert!(lock.transports.get(&srv_open_labels).is_none());
|
assert_eq!(lock.transports().open_total(&server_transport), 0);
|
||||||
assert!(lock.transports.get(&client_open_labels).is_none());
|
assert_eq!(lock.transports().open_total(&client_transport), 0);
|
||||||
assert!(lock.transport_closes.get(&srv_close_labels).is_none());
|
|
||||||
assert!(lock.transport_closes.get(&client_close_labels).is_none());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for e in &events {
|
for e in &events {
|
||||||
|
@ -262,8 +249,7 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
let lock = r.metrics.lock()
|
let mut lock = r.metrics.lock().expect("lock");
|
||||||
.expect("lock");
|
|
||||||
|
|
||||||
// === request scope ====================================
|
// === request scope ====================================
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
|
@ -274,55 +260,35 @@ mod test {
|
||||||
);
|
);
|
||||||
|
|
||||||
// === response scope ===================================
|
// === response scope ===================================
|
||||||
let response_scope = lock
|
{
|
||||||
.responses
|
let response_scope = lock
|
||||||
.get(&rsp_labels)
|
.responses
|
||||||
.expect("response scope missing");
|
.get(&rsp_labels)
|
||||||
assert_eq!(response_scope.total(), 1);
|
.expect("response scope missing");
|
||||||
|
assert_eq!(response_scope.total(), 1);
|
||||||
|
|
||||||
response_scope.latency()
|
response_scope.latency()
|
||||||
.assert_bucket_exactly(200, 1)
|
.assert_bucket_exactly(200, 1)
|
||||||
.assert_gt_exactly(200, 0)
|
.assert_gt_exactly(200, 0)
|
||||||
.assert_lt_exactly(200, 0);
|
.assert_lt_exactly(200, 0);
|
||||||
|
}
|
||||||
// === server transport open scope ======================
|
|
||||||
let srv_transport_scope = lock
|
|
||||||
.transports
|
|
||||||
.get(&srv_open_labels)
|
|
||||||
.expect("server transport scope missing");
|
|
||||||
assert_eq!(srv_transport_scope.open_total(), 1);
|
|
||||||
assert_eq!(srv_transport_scope.write_bytes_total(), 4321);
|
|
||||||
assert_eq!(srv_transport_scope.read_bytes_total(), 4321);
|
|
||||||
|
|
||||||
// === client transport open scope ======================
|
|
||||||
let client_transport_scope = lock
|
|
||||||
.transports
|
|
||||||
.get(&client_open_labels)
|
|
||||||
.expect("client transport scope missing");
|
|
||||||
assert_eq!(client_transport_scope.open_total(), 1);
|
|
||||||
assert_eq!(client_transport_scope.write_bytes_total(), 4321);
|
|
||||||
assert_eq!(client_transport_scope.read_bytes_total(), 4321);
|
|
||||||
|
|
||||||
|
use super::transport::Eos;
|
||||||
let transport_duration: u64 = 30_000 * 1_000;
|
let transport_duration: u64 = 30_000 * 1_000;
|
||||||
|
let t = lock.transports();
|
||||||
|
|
||||||
// === server transport close scope =====================
|
assert_eq!(t.open_total(&server_transport), 1);
|
||||||
let srv_transport_close_scope = lock
|
assert_eq!(t.rx_tx_bytes_total(&server_transport), (4321, 4321));
|
||||||
.transport_closes
|
assert_eq!(t.close_total(&server_transport, Eos::Clean), 1);
|
||||||
.get(&srv_close_labels)
|
t.connection_durations(&server_transport, Eos::Clean)
|
||||||
.expect("server transport close scope missing");
|
|
||||||
assert_eq!(srv_transport_close_scope.close_total(), 1);
|
|
||||||
srv_transport_close_scope.connection_duration()
|
|
||||||
.assert_bucket_exactly(transport_duration, 1)
|
.assert_bucket_exactly(transport_duration, 1)
|
||||||
.assert_gt_exactly(transport_duration, 0)
|
.assert_gt_exactly(transport_duration, 0)
|
||||||
.assert_lt_exactly(transport_duration, 0);
|
.assert_lt_exactly(transport_duration, 0);
|
||||||
|
|
||||||
// === client transport close scope =====================
|
assert_eq!(t.open_total(&client_transport), 1);
|
||||||
let client_transport_close_scope = lock
|
assert_eq!(t.rx_tx_bytes_total(&client_transport), (4321, 4321));
|
||||||
.transport_closes
|
assert_eq!(t.close_total(&server_transport, Eos::Clean), 1);
|
||||||
.get(&client_close_labels)
|
t.connection_durations(&server_transport, Eos::Clean)
|
||||||
.expect("client transport close scope missing");
|
|
||||||
assert_eq!(client_transport_close_scope.close_total(), 1);
|
|
||||||
client_transport_close_scope.connection_duration()
|
|
||||||
.assert_bucket_exactly(transport_duration, 1)
|
.assert_bucket_exactly(transport_duration, 1)
|
||||||
.assert_gt_exactly(transport_duration, 0)
|
.assert_gt_exactly(transport_duration, 0)
|
||||||
.assert_lt_exactly(transport_duration, 0);
|
.assert_lt_exactly(transport_duration, 0);
|
||||||
|
|
|
@ -3,30 +3,49 @@ use std::time::Duration;
|
||||||
|
|
||||||
use ctx;
|
use ctx;
|
||||||
use super::{
|
use super::{
|
||||||
labels::{Classification, Direction, TlsStatus},
|
labels::{Direction, TlsStatus},
|
||||||
latency,
|
latency,
|
||||||
Counter,
|
Counter,
|
||||||
Gauge,
|
Gauge,
|
||||||
Histogram,
|
Histogram,
|
||||||
Scopes,
|
Scopes,
|
||||||
};
|
};
|
||||||
use telemetry::{event, Errno};
|
use telemetry::Errno;
|
||||||
|
|
||||||
pub(super) type OpenScopes = Scopes<TransportLabels, OpenMetrics>;
|
metrics! {
|
||||||
|
tcp_open_total: Counter { "Total count of opened connections" },
|
||||||
|
tcp_open_connections: Gauge { "Number of currently-open connections" },
|
||||||
|
tcp_read_bytes_total: Counter { "Total count of bytes read from peers" },
|
||||||
|
tcp_write_bytes_total: Counter { "Total count of bytes written to peers" },
|
||||||
|
|
||||||
|
tcp_close_total: Counter { "Total count of closed connections" },
|
||||||
|
tcp_connection_duration_ms: Histogram<latency::Ms> { "Connection lifetimes" }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Holds all transport stats.
|
||||||
|
///
|
||||||
|
/// Implements `fmt::Display` to render prometheus-formatted metrics for all transports.
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub struct Transports {
|
||||||
|
opens: OpenScopes,
|
||||||
|
closes: CloseScopes,
|
||||||
|
}
|
||||||
|
|
||||||
|
type OpenScopes = Scopes<TransportLabels, OpenMetrics>;
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
pub(super) struct OpenMetrics {
|
struct OpenMetrics {
|
||||||
open_total: Counter,
|
open_total: Counter,
|
||||||
open_connections: Gauge,
|
open_connections: Gauge,
|
||||||
write_bytes_total: Counter,
|
write_bytes_total: Counter,
|
||||||
read_bytes_total: Counter,
|
read_bytes_total: Counter,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) type CloseScopes = Scopes<TransportCloseLabels, CloseMetrics>;
|
type CloseScopes = Scopes<TransportCloseLabels, CloseMetrics>;
|
||||||
|
|
||||||
/// Labels describing a TCP connection
|
/// Labels describing a TCP connection
|
||||||
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
|
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
|
||||||
pub struct TransportLabels {
|
struct TransportLabels {
|
||||||
/// Was the transport opened in the inbound or outbound direction?
|
/// Was the transport opened in the inbound or outbound direction?
|
||||||
direction: Direction,
|
direction: Direction,
|
||||||
|
|
||||||
|
@ -37,141 +56,156 @@ pub struct TransportLabels {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
|
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
|
||||||
pub enum Peer { Src, Dst }
|
enum Peer { Src, Dst }
|
||||||
|
|
||||||
|
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
|
||||||
|
pub enum Eos {
|
||||||
|
Clean,
|
||||||
|
Error {
|
||||||
|
errno: Option<Errno>,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
/// Labels describing the end of a TCP connection
|
/// Labels describing the end of a TCP connection
|
||||||
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
|
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
|
||||||
pub struct TransportCloseLabels {
|
struct TransportCloseLabels {
|
||||||
/// Labels describing the TCP connection that closed.
|
transport: TransportLabels,
|
||||||
pub(super) transport: TransportLabels,
|
eos: Eos,
|
||||||
|
|
||||||
/// Was the transport closed successfully?
|
|
||||||
classification: Classification,
|
|
||||||
|
|
||||||
/// If `classification` == `Failure`, this may be set with the
|
|
||||||
/// OS error number describing the error, if there was one.
|
|
||||||
/// Otherwise, it should be `None`.
|
|
||||||
errno: Option<Errno>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
pub(super) struct CloseMetrics {
|
struct CloseMetrics {
|
||||||
close_total: Counter,
|
close_total: Counter,
|
||||||
connection_duration: Histogram<latency::Ms>,
|
connection_duration: Histogram<latency::Ms>,
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===== impl OpenScopes =====
|
// ===== impl Transports =====
|
||||||
|
|
||||||
impl OpenScopes {
|
impl Transports {
|
||||||
metrics! {
|
pub fn open(&mut self, ctx: &ctx::transport::Ctx) {
|
||||||
tcp_open_total: Counter { "Total count of opened connections" },
|
let k = TransportLabels::new(ctx);
|
||||||
tcp_open_connections: Gauge { "Number of currently-open connections" },
|
let metrics = self.opens.get_or_default(k);
|
||||||
tcp_read_bytes_total: Counter { "Total count of bytes read from peers" },
|
metrics.open_total.incr();
|
||||||
tcp_write_bytes_total: Counter { "Total count of bytes written to peers" }
|
metrics.open_connections.incr();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn close(
|
||||||
|
&mut self,
|
||||||
|
ctx: &ctx::transport::Ctx,
|
||||||
|
eos: Eos,
|
||||||
|
duration: Duration,
|
||||||
|
rx: u64,
|
||||||
|
tx: u64,
|
||||||
|
) {
|
||||||
|
let key = TransportLabels::new(ctx);
|
||||||
|
|
||||||
|
let o = self.opens.get_or_default(key);
|
||||||
|
o.open_connections.decr();
|
||||||
|
o.read_bytes_total += rx;
|
||||||
|
o.write_bytes_total += tx;
|
||||||
|
|
||||||
|
let k = TransportCloseLabels::new(key, eos);
|
||||||
|
let c = self.closes.get_or_default(k);
|
||||||
|
c.close_total.incr();
|
||||||
|
c.connection_duration.add(duration);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn open_total(&self, ctx: &ctx::transport::Ctx) -> u64 {
|
||||||
|
self.opens
|
||||||
|
.get(&TransportLabels::new(ctx))
|
||||||
|
.map(|m| m.open_total.into())
|
||||||
|
.unwrap_or(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// #[cfg(test)]
|
||||||
|
// pub fn open_connections(&self, ctx: &ctx::transport::Ctx) -> u64 {
|
||||||
|
// self.metrics
|
||||||
|
// .get(&Key::from(ctx))
|
||||||
|
// .map(|m| m.open_connections.into())
|
||||||
|
// .unwrap_or(0)
|
||||||
|
// }
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn rx_tx_bytes_total(&self, ctx: &ctx::transport::Ctx) -> (u64, u64) {
|
||||||
|
self.opens
|
||||||
|
.get(&TransportLabels::new(ctx))
|
||||||
|
.map(|m| (m.read_bytes_total.into(), m.write_bytes_total.into()))
|
||||||
|
.unwrap_or((0, 0))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn close_total(&self, ctx: &ctx::transport::Ctx, eos: Eos) -> u64 {
|
||||||
|
self.closes
|
||||||
|
.get(&TransportCloseLabels::new(TransportLabels::new(ctx), eos))
|
||||||
|
.map(|m| m.close_total.into())
|
||||||
|
.unwrap_or(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn connection_durations(&self, ctx: &ctx::transport::Ctx, eos: Eos) -> Histogram<latency::Ms> {
|
||||||
|
self.closes
|
||||||
|
.get(&TransportCloseLabels::new(TransportLabels::new(ctx), eos))
|
||||||
|
.map(|m| m.connection_duration.clone())
|
||||||
|
.unwrap_or_default()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for Transports {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
self.opens.fmt(f)?;
|
||||||
|
self.closes.fmt(f)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ===== impl OpenScopes =====
|
||||||
|
|
||||||
impl fmt::Display for OpenScopes {
|
impl fmt::Display for OpenScopes {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
if self.is_empty() {
|
if self.is_empty() {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
Self::tcp_open_total.fmt_help(f)?;
|
tcp_open_total.fmt_help(f)?;
|
||||||
Self::tcp_open_total.fmt_scopes(f, self, |s| &s.open_total)?;
|
tcp_open_total.fmt_scopes(f, self, |s| &s.open_total)?;
|
||||||
|
|
||||||
Self::tcp_open_connections.fmt_help(f)?;
|
tcp_open_connections.fmt_help(f)?;
|
||||||
Self::tcp_open_connections.fmt_scopes(f, self, |s| &s.open_connections)?;
|
tcp_open_connections.fmt_scopes(f, self, |s| &s.open_connections)?;
|
||||||
|
|
||||||
Self::tcp_read_bytes_total.fmt_help(f)?;
|
tcp_read_bytes_total.fmt_help(f)?;
|
||||||
Self::tcp_read_bytes_total.fmt_scopes(f, self, |s| &s.read_bytes_total)?;
|
tcp_read_bytes_total.fmt_scopes(f, self, |s| &s.read_bytes_total)?;
|
||||||
|
|
||||||
Self::tcp_write_bytes_total.fmt_help(f)?;
|
tcp_write_bytes_total.fmt_help(f)?;
|
||||||
Self::tcp_write_bytes_total.fmt_scopes(f, self, |s| &s.write_bytes_total)?;
|
tcp_write_bytes_total.fmt_scopes(f, self, |s| &s.write_bytes_total)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===== impl OpenMetrics =====
|
|
||||||
|
|
||||||
impl OpenMetrics {
|
|
||||||
pub(super) fn open(&mut self) {
|
|
||||||
self.open_total.incr();
|
|
||||||
self.open_connections.incr();
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(super) fn close(&mut self, rx: u64, tx: u64) {
|
|
||||||
self.open_connections.decr();
|
|
||||||
self.read_bytes_total += rx;
|
|
||||||
self.write_bytes_total += tx;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
pub(super) fn open_total(&self) -> u64 {
|
|
||||||
self.open_total.into()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
pub(super) fn read_bytes_total(&self) -> u64 {
|
|
||||||
self.read_bytes_total.into()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
pub(super) fn write_bytes_total(&self) -> u64 {
|
|
||||||
self.write_bytes_total.into()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ===== impl CloseScopes =====
|
// ===== impl CloseScopes =====
|
||||||
|
|
||||||
impl CloseScopes {
|
|
||||||
metrics! {
|
|
||||||
tcp_close_total: Counter { "Total count of closed connections" },
|
|
||||||
tcp_connection_duration_ms: Histogram<latency::Ms> { "Connection lifetimes" }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Display for CloseScopes {
|
impl fmt::Display for CloseScopes {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
if self.is_empty() {
|
if self.is_empty() {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
Self::tcp_close_total.fmt_help(f)?;
|
tcp_close_total.fmt_help(f)?;
|
||||||
Self::tcp_close_total.fmt_scopes(f, self, |s| &s.close_total)?;
|
tcp_close_total.fmt_scopes(f, self, |s| &s.close_total)?;
|
||||||
|
|
||||||
Self::tcp_connection_duration_ms.fmt_help(f)?;
|
tcp_connection_duration_ms.fmt_help(f)?;
|
||||||
Self::tcp_connection_duration_ms.fmt_scopes(f, self, |s| &s.connection_duration)?;
|
tcp_connection_duration_ms.fmt_scopes(f, self, |s| &s.connection_duration)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===== impl CloseMetrics =====
|
|
||||||
|
|
||||||
impl CloseMetrics {
|
|
||||||
pub(super) fn close(&mut self, duration: Duration) {
|
|
||||||
self.close_total.incr();
|
|
||||||
self.connection_duration.add(duration);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
pub(super) fn close_total(&self) -> u64 {
|
|
||||||
self.close_total.into()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
pub(super) fn connection_duration(&self) -> &Histogram<latency::Ms> {
|
|
||||||
&self.connection_duration
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// ===== impl TransportLabels =====
|
// ===== impl TransportLabels =====
|
||||||
|
|
||||||
impl TransportLabels {
|
impl TransportLabels {
|
||||||
pub fn new(ctx: &ctx::transport::Ctx) -> Self {
|
fn new(ctx: &ctx::transport::Ctx) -> Self {
|
||||||
TransportLabels {
|
TransportLabels {
|
||||||
direction: Direction::from_context(ctx.proxy().as_ref()),
|
direction: Direction::from_context(ctx.proxy().as_ref()),
|
||||||
peer: match *ctx {
|
peer: match *ctx {
|
||||||
|
@ -181,11 +215,6 @@ impl TransportLabels {
|
||||||
tls_status: ctx.tls_status().into(),
|
tls_status: ctx.tls_status().into(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
pub fn tls_status(&self) -> TlsStatus {
|
|
||||||
self.tls_status
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Display for TransportLabels {
|
impl fmt::Display for TransportLabels {
|
||||||
|
@ -206,36 +235,33 @@ impl fmt::Display for Peer {
|
||||||
// ===== impl TransportCloseLabels =====
|
// ===== impl TransportCloseLabels =====
|
||||||
|
|
||||||
impl TransportCloseLabels {
|
impl TransportCloseLabels {
|
||||||
pub fn new(ctx: &ctx::transport::Ctx,
|
fn new(transport: TransportLabels, eos: Eos) -> Self {
|
||||||
close: &event::TransportClose)
|
Self {
|
||||||
-> Self {
|
transport,
|
||||||
let classification = Classification::transport_close(close);
|
eos,
|
||||||
let errno = close.errno.map(|code| {
|
|
||||||
// If the error code is set, this should be classified
|
|
||||||
// as a failure!
|
|
||||||
debug_assert!(classification == Classification::Failure);
|
|
||||||
Errno::from(code)
|
|
||||||
});
|
|
||||||
TransportCloseLabels {
|
|
||||||
transport: TransportLabels::new(ctx),
|
|
||||||
classification,
|
|
||||||
errno,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
pub fn tls_status(&self) -> TlsStatus {
|
|
||||||
self.transport.tls_status()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Display for TransportCloseLabels {
|
impl fmt::Display for TransportCloseLabels {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
write!(f, "{},{}", self.transport, self.classification)?;
|
write!(f, "{},{}", self.transport, self.eos)
|
||||||
if let Some(errno) = self.errno {
|
|
||||||
write!(f, ",errno=\"{}\"", errno)?;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ===== impl Eos =====
|
||||||
|
|
||||||
|
impl fmt::Display for Eos {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
match &self {
|
||||||
|
Eos::Clean => f.pad("classification=\"success\""),
|
||||||
|
Eos::Error { errno } => {
|
||||||
|
f.pad("classification=\"failure\"")?;
|
||||||
|
if let Some(e) = errno {
|
||||||
|
write!(f, ",errno=\"{}\"", e)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue