diff --git a/src/bind.rs b/src/bind.rs index ae8761c25..7b08f0711 100644 --- a/src/bind.rs +++ b/src/bind.rs @@ -29,7 +29,7 @@ use watch_service::{WatchService, Rebind}; pub struct Bind { ctx: C, sensors: telemetry::Sensors, - transport_registry: telemetry::transport::Registry, + transport_registry: transport::metrics::Registry, tls_client_config: tls::ClientConfigWatch, _p: PhantomData B>, } @@ -150,7 +150,7 @@ pub type HttpResponse = http::Response = http::Request>; pub type Client = transparency::Client< - telemetry::transport::Connect, + transport::metrics::Connect, ::logging::ClientExecutor<&'static str, SocketAddr>, B, >; @@ -184,7 +184,7 @@ impl Error for BufferSpawnError { impl Bind<(), B> { pub fn new( sensors: telemetry::Sensors, - transport_registry: telemetry::transport::Registry, + transport_registry: transport::metrics::Registry, tls_client_config: tls::ClientConfigWatch ) -> Self { Self { diff --git a/src/inbound.rs b/src/inbound.rs index 86e39a293..28b7fc765 100644 --- a/src/inbound.rs +++ b/src/inbound.rs @@ -116,7 +116,7 @@ mod tests { fn new_inbound(default: Option, ctx: ctx::Proxy) -> Inbound<()> { let bind = Bind::new( ::telemetry::Sensors::for_test(), - ::telemetry::transport::Registry::default(), + ::transport::metrics::Registry::default(), tls::ClientConfig::no_tls() ); Inbound::new(default, bind.with_ctx(ctx)) diff --git a/src/lib.rs b/src/lib.rs index c70549d82..9b29d6966 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -253,11 +253,18 @@ where ); let (taps, observe) = control::Observe::new(100); - let (sensors, transport_registry, tls_config_sensor, metrics_server) = telemetry::new( - start_time, - config.metrics_retain_idle, - &taps, - ); + let (http_sensors, http_report) = telemetry::http::new(config.metrics_retain_idle, &taps); + + let (transport_registry, transport_report) = transport::metrics::new(); + + let (tls_config_sensor, tls_config_report) = telemetry::tls_config_reload::new(); + + let report = telemetry::Report::new( + http_report, + transport_report, + tls_config_report, + telemetry::process::Report::new(start_time), + ); let tls_client_config = tls_config_watch.client.clone(); let tls_cfg_bg = tls_config_watch.start(tls_config_sensor); @@ -289,7 +296,7 @@ where let (drain_tx, drain_rx) = drain::channel(); let bind = Bind::new( - sensors.clone(), + http_sensors.clone(), transport_registry.clone(), tls_client_config ); @@ -359,7 +366,7 @@ where let metrics = control::serve_http( "metrics", metrics_listener, - metrics_server, + linkerd2_metrics::Serve::new(report), ); rt.spawn(::logging::admin().bg("resolver").future(resolver_bg)); @@ -402,7 +409,7 @@ fn serve( tcp_connect_timeout: Duration, disable_protocol_detection_ports: IndexSet, proxy_ctx: ctx::Proxy, - transport_registry: telemetry::transport::Registry, + transport_registry: transport::metrics::Registry, get_orig_dst: G, drain_rx: drain::Watch, ) -> impl Future + Send + 'static diff --git a/src/telemetry/mod.rs b/src/telemetry/mod.rs index 4203d9c20..21fa095c1 100644 --- a/src/telemetry/mod.rs +++ b/src/telemetry/mod.rs @@ -1,33 +1,15 @@ -use std::sync::{Arc, Mutex}; -use std::time::{Duration, SystemTime}; - use linkerd2_metrics as metrics; mod errno; pub mod http; -mod process; +pub mod process; mod report; pub mod tap; pub mod tls_config_reload; -pub mod transport; -use self::errno::Errno; +pub use self::errno::Errno; pub use self::http::event::Event; pub use self::report::Report; pub use self::http::Sensors; pub type ServeMetrics = metrics::Serve; - -pub fn new( - start_time: SystemTime, - metrics_retain_idle: Duration, - taps: &Arc>, -) -> (Sensors, transport::Registry, tls_config_reload::Sensor, ServeMetrics) { - let process = process::Report::new(start_time); - let (http_sensors, http_report) = http::new(metrics_retain_idle, taps); - let (transport_registry, transport_report) = transport::new(); - let (tls_config_sensor, tls_config_report) = tls_config_reload::new(); - - let report = Report::new(http_report, transport_report, tls_config_report, process); - (http_sensors, transport_registry, tls_config_sensor, ServeMetrics::new(report)) -} diff --git a/src/telemetry/report.rs b/src/telemetry/report.rs index aa092f6bb..2e3a77ee1 100644 --- a/src/telemetry/report.rs +++ b/src/telemetry/report.rs @@ -1,6 +1,7 @@ use std::fmt; -use super::{http, process, tls_config_reload, transport}; +use transport::metrics as transport; +use super::{http, process, tls_config_reload}; use super::metrics::FmtMetrics; /// Implements `FmtMetrics` to report runtime metrics. @@ -15,7 +16,7 @@ pub struct Report { // ===== impl Report ===== impl Report { - pub(super) fn new( + pub fn new( http: http::Report, transports: transport::Report, tls_config_reload: tls_config_reload::Report, diff --git a/src/transparency/server.rs b/src/transparency/server.rs index 11beb7760..96be651b2 100644 --- a/src/transparency/server.rs +++ b/src/transparency/server.rs @@ -14,12 +14,10 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tower_service::NewService; use tower_h2; -use transport::{Connection, Peek}; use ctx::Proxy as ProxyCtx; use ctx::transport::{Server as ServerCtx}; use drain; -use telemetry; -use transport::GetOriginalDst; +use transport::{self, Connection, GetOriginalDst, Peek}; use super::glue::{HttpBody, HttpBodyNewSvc, HyperServerSvc}; use super::protocol::Protocol; use super::tcp; @@ -47,7 +45,7 @@ where listen_addr: SocketAddr, new_service: S, proxy_ctx: ProxyCtx, - transport_registry: telemetry::transport::Registry, + transport_registry: transport::metrics::Registry, tcp: tcp::Proxy, log: ::logging::Server, } @@ -76,7 +74,7 @@ where pub fn new( listen_addr: SocketAddr, proxy_ctx: ProxyCtx, - transport_registry: telemetry::transport::Registry, + transport_registry: transport::metrics::Registry, get_orig_dst: G, stack: S, tcp_connect_timeout: Duration, diff --git a/src/transparency/tcp.rs b/src/transparency/tcp.rs index 0040f5e6e..babe33a06 100644 --- a/src/transparency/tcp.rs +++ b/src/transparency/tcp.rs @@ -13,7 +13,6 @@ use ctx::transport::{ Client as ClientCtx, Server as ServerCtx, }; -use telemetry; use timeout::Timeout; use transport::{self, tls}; use ctx::transport::TlsStatus; @@ -22,14 +21,14 @@ use ctx::transport::TlsStatus; #[derive(Debug, Clone)] pub struct Proxy { connect_timeout: Duration, - transport_registry: telemetry::transport::Registry, + transport_registry: transport::metrics::Registry, } impl Proxy { /// Create a new TCP `Proxy`. pub fn new( connect_timeout: Duration, - transport_registry: telemetry::transport::Registry + transport_registry: transport::metrics::Registry ) -> Self { Self { connect_timeout, diff --git a/src/telemetry/transport/io.rs b/src/transport/metrics/io.rs similarity index 100% rename from src/telemetry/transport/io.rs rename to src/transport/metrics/io.rs diff --git a/src/telemetry/transport/mod.rs b/src/transport/metrics/mod.rs similarity index 84% rename from src/telemetry/transport/mod.rs rename to src/transport/metrics/mod.rs index c65a83bac..4edb6ec7b 100644 --- a/src/telemetry/transport/mod.rs +++ b/src/transport/metrics/mod.rs @@ -5,9 +5,7 @@ use std::time::Instant; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_connect; -use ctx; -use telemetry::Errno; -use telemetry::metrics::{ +use linkerd2_metrics::{ latency, Counter, FmtLabels, @@ -17,6 +15,9 @@ use telemetry::metrics::{ Histogram, Metric, }; + +use ctx; +use telemetry::Errno; use transport::Connection; mod io; @@ -197,50 +198,6 @@ impl Registry { } } -#[cfg(test)] -impl Registry { - - pub fn open_total(&self, ctx: &ctx::transport::Ctx) -> u64 { - self.0.lock().unwrap().0 - .get(&Key::new(ctx)) - .map(|m| m.lock().unwrap().open_total.into()) - .unwrap_or(0) - } - - // pub fn open_connections(&self, ctx: &ctx::transport::Ctx) -> u64 { - // self.0.lock().unwrap().0 - // .get(&Key::new(ctx)) - // .map(|m| m.lock().unwrap().open_connections.into()) - // .unwrap_or(0) - // } - - pub fn rx_tx_bytes_total(&self, ctx: &ctx::transport::Ctx) -> (u64, u64) { - self.0.lock().unwrap().0 - .get(&Key::new(ctx)) - .map(|m| { - let m = m.lock().unwrap(); - (m.read_bytes_total.into(), m.write_bytes_total.into()) - }) - .unwrap_or((0, 0)) - } - - pub fn close_total(&self, ctx: &ctx::transport::Ctx, eos: Eos) -> u64 { - self.0.lock().unwrap().0 - .get(&Key::new(ctx)) - .and_then(move |m| m.lock().unwrap().by_eos.get(&eos).map(|m| m.close_total.into())) - .unwrap_or(0) - } - - pub fn connection_durations(&self, ctx: &ctx::transport::Ctx, eos: Eos) -> Histogram { - self.0.lock().unwrap().0 - .get(&Key::new(ctx)) - .and_then(move |m| { - m.lock().unwrap().by_eos.get(&eos).map(|m| m.connection_duration.clone()) - }) - .unwrap_or_default() - } -} - // ===== impl Report ===== impl FmtMetrics for Report { @@ -343,14 +300,6 @@ impl NewSensor { // ===== impl Key ===== impl Key { - #[cfg(test)] - fn new(ctx: &ctx::transport::Ctx) -> Self { - match ctx { - ctx::transport::Ctx::Client(ctx) => Self::client(ctx), - ctx::transport::Ctx::Server(ctx) => Self::server(ctx), - } - } - fn client(ctx: &ctx::transport::Client) -> Self { Self { proxy: ctx.proxy, diff --git a/src/transport/mod.rs b/src/transport/mod.rs index d19589c35..f14bc7d7f 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -2,6 +2,7 @@ mod connect; mod connection; mod addr_info; mod io; +pub mod metrics; mod prefixed; pub mod tls;