Move telemetry::transport to transport::metrics (#85)

Following #84, the `telemetry::transport` module can be moved into the
`transport` module.

This should allow us to simplify type signatures by combining redundant
types. It's also hoped that we can reduce the API boilerplate around
metrics so it's much easier to instrument and track new metrics in
transport code.
This commit is contained in:
Oliver Gould 2018-08-24 15:09:36 -07:00 committed by GitHub
parent 8a9a9bf26b
commit 8ea9a3644d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 34 additions and 97 deletions

View File

@ -29,7 +29,7 @@ use watch_service::{WatchService, Rebind};
pub struct Bind<C, B> {
ctx: C,
sensors: telemetry::Sensors,
transport_registry: telemetry::transport::Registry,
transport_registry: transport::metrics::Registry,
tls_client_config: tls::ClientConfigWatch,
_p: PhantomData<fn() -> B>,
}
@ -150,7 +150,7 @@ pub type HttpResponse = http::Response<telemetry::http::service::ResponseBody<Ht
pub type HttpRequest<B> = http::Request<telemetry::http::service::RequestBody<B>>;
pub type Client<B> = transparency::Client<
telemetry::transport::Connect<transport::Connect>,
transport::metrics::Connect<transport::Connect>,
::logging::ClientExecutor<&'static str, SocketAddr>,
B,
>;
@ -184,7 +184,7 @@ impl Error for BufferSpawnError {
impl<B> Bind<(), B> {
pub fn new(
sensors: telemetry::Sensors,
transport_registry: telemetry::transport::Registry,
transport_registry: transport::metrics::Registry,
tls_client_config: tls::ClientConfigWatch
) -> Self {
Self {

View File

@ -116,7 +116,7 @@ mod tests {
fn new_inbound(default: Option<net::SocketAddr>, ctx: ctx::Proxy) -> Inbound<()> {
let bind = Bind::new(
::telemetry::Sensors::for_test(),
::telemetry::transport::Registry::default(),
::transport::metrics::Registry::default(),
tls::ClientConfig::no_tls()
);
Inbound::new(default, bind.with_ctx(ctx))

View File

@ -253,11 +253,18 @@ where
);
let (taps, observe) = control::Observe::new(100);
let (sensors, transport_registry, tls_config_sensor, metrics_server) = telemetry::new(
start_time,
config.metrics_retain_idle,
&taps,
);
let (http_sensors, http_report) = telemetry::http::new(config.metrics_retain_idle, &taps);
let (transport_registry, transport_report) = transport::metrics::new();
let (tls_config_sensor, tls_config_report) = telemetry::tls_config_reload::new();
let report = telemetry::Report::new(
http_report,
transport_report,
tls_config_report,
telemetry::process::Report::new(start_time),
);
let tls_client_config = tls_config_watch.client.clone();
let tls_cfg_bg = tls_config_watch.start(tls_config_sensor);
@ -289,7 +296,7 @@ where
let (drain_tx, drain_rx) = drain::channel();
let bind = Bind::new(
sensors.clone(),
http_sensors.clone(),
transport_registry.clone(),
tls_client_config
);
@ -359,7 +366,7 @@ where
let metrics = control::serve_http(
"metrics",
metrics_listener,
metrics_server,
linkerd2_metrics::Serve::new(report),
);
rt.spawn(::logging::admin().bg("resolver").future(resolver_bg));
@ -402,7 +409,7 @@ fn serve<R, B, E, F, G>(
tcp_connect_timeout: Duration,
disable_protocol_detection_ports: IndexSet<u16>,
proxy_ctx: ctx::Proxy,
transport_registry: telemetry::transport::Registry,
transport_registry: transport::metrics::Registry,
get_orig_dst: G,
drain_rx: drain::Watch,
) -> impl Future<Item = (), Error = io::Error> + Send + 'static

View File

@ -1,33 +1,15 @@
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
use linkerd2_metrics as metrics;
mod errno;
pub mod http;
mod process;
pub mod process;
mod report;
pub mod tap;
pub mod tls_config_reload;
pub mod transport;
use self::errno::Errno;
pub use self::errno::Errno;
pub use self::http::event::Event;
pub use self::report::Report;
pub use self::http::Sensors;
pub type ServeMetrics = metrics::Serve<Report>;
pub fn new(
start_time: SystemTime,
metrics_retain_idle: Duration,
taps: &Arc<Mutex<tap::Taps>>,
) -> (Sensors, transport::Registry, tls_config_reload::Sensor, ServeMetrics) {
let process = process::Report::new(start_time);
let (http_sensors, http_report) = http::new(metrics_retain_idle, taps);
let (transport_registry, transport_report) = transport::new();
let (tls_config_sensor, tls_config_report) = tls_config_reload::new();
let report = Report::new(http_report, transport_report, tls_config_report, process);
(http_sensors, transport_registry, tls_config_sensor, ServeMetrics::new(report))
}

View File

@ -1,6 +1,7 @@
use std::fmt;
use super::{http, process, tls_config_reload, transport};
use transport::metrics as transport;
use super::{http, process, tls_config_reload};
use super::metrics::FmtMetrics;
/// Implements `FmtMetrics` to report runtime metrics.
@ -15,7 +16,7 @@ pub struct Report {
// ===== impl Report =====
impl Report {
pub(super) fn new(
pub fn new(
http: http::Report,
transports: transport::Report,
tls_config_reload: tls_config_reload::Report,

View File

@ -14,12 +14,10 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tower_service::NewService;
use tower_h2;
use transport::{Connection, Peek};
use ctx::Proxy as ProxyCtx;
use ctx::transport::{Server as ServerCtx};
use drain;
use telemetry;
use transport::GetOriginalDst;
use transport::{self, Connection, GetOriginalDst, Peek};
use super::glue::{HttpBody, HttpBodyNewSvc, HyperServerSvc};
use super::protocol::Protocol;
use super::tcp;
@ -47,7 +45,7 @@ where
listen_addr: SocketAddr,
new_service: S,
proxy_ctx: ProxyCtx,
transport_registry: telemetry::transport::Registry,
transport_registry: transport::metrics::Registry,
tcp: tcp::Proxy,
log: ::logging::Server,
}
@ -76,7 +74,7 @@ where
pub fn new(
listen_addr: SocketAddr,
proxy_ctx: ProxyCtx,
transport_registry: telemetry::transport::Registry,
transport_registry: transport::metrics::Registry,
get_orig_dst: G,
stack: S,
tcp_connect_timeout: Duration,

View File

@ -13,7 +13,6 @@ use ctx::transport::{
Client as ClientCtx,
Server as ServerCtx,
};
use telemetry;
use timeout::Timeout;
use transport::{self, tls};
use ctx::transport::TlsStatus;
@ -22,14 +21,14 @@ use ctx::transport::TlsStatus;
#[derive(Debug, Clone)]
pub struct Proxy {
connect_timeout: Duration,
transport_registry: telemetry::transport::Registry,
transport_registry: transport::metrics::Registry,
}
impl Proxy {
/// Create a new TCP `Proxy`.
pub fn new(
connect_timeout: Duration,
transport_registry: telemetry::transport::Registry
transport_registry: transport::metrics::Registry
) -> Self {
Self {
connect_timeout,

View File

@ -5,9 +5,7 @@ use std::time::Instant;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_connect;
use ctx;
use telemetry::Errno;
use telemetry::metrics::{
use linkerd2_metrics::{
latency,
Counter,
FmtLabels,
@ -17,6 +15,9 @@ use telemetry::metrics::{
Histogram,
Metric,
};
use ctx;
use telemetry::Errno;
use transport::Connection;
mod io;
@ -197,50 +198,6 @@ impl Registry {
}
}
#[cfg(test)]
impl Registry {
pub fn open_total(&self, ctx: &ctx::transport::Ctx) -> u64 {
self.0.lock().unwrap().0
.get(&Key::new(ctx))
.map(|m| m.lock().unwrap().open_total.into())
.unwrap_or(0)
}
// pub fn open_connections(&self, ctx: &ctx::transport::Ctx) -> u64 {
// self.0.lock().unwrap().0
// .get(&Key::new(ctx))
// .map(|m| m.lock().unwrap().open_connections.into())
// .unwrap_or(0)
// }
pub fn rx_tx_bytes_total(&self, ctx: &ctx::transport::Ctx) -> (u64, u64) {
self.0.lock().unwrap().0
.get(&Key::new(ctx))
.map(|m| {
let m = m.lock().unwrap();
(m.read_bytes_total.into(), m.write_bytes_total.into())
})
.unwrap_or((0, 0))
}
pub fn close_total(&self, ctx: &ctx::transport::Ctx, eos: Eos) -> u64 {
self.0.lock().unwrap().0
.get(&Key::new(ctx))
.and_then(move |m| m.lock().unwrap().by_eos.get(&eos).map(|m| m.close_total.into()))
.unwrap_or(0)
}
pub fn connection_durations(&self, ctx: &ctx::transport::Ctx, eos: Eos) -> Histogram<latency::Ms> {
self.0.lock().unwrap().0
.get(&Key::new(ctx))
.and_then(move |m| {
m.lock().unwrap().by_eos.get(&eos).map(|m| m.connection_duration.clone())
})
.unwrap_or_default()
}
}
// ===== impl Report =====
impl FmtMetrics for Report {
@ -343,14 +300,6 @@ impl NewSensor {
// ===== impl Key =====
impl Key {
#[cfg(test)]
fn new(ctx: &ctx::transport::Ctx) -> Self {
match ctx {
ctx::transport::Ctx::Client(ctx) => Self::client(ctx),
ctx::transport::Ctx::Server(ctx) => Self::server(ctx),
}
}
fn client(ctx: &ctx::transport::Client) -> Self {
Self {
proxy: ctx.proxy,

View File

@ -2,6 +2,7 @@ mod connect;
mod connection;
mod addr_info;
mod io;
pub mod metrics;
mod prefixed;
pub mod tls;